Flink接入Kafka专享版
更新时间:2023-07-05
接入准备
1. 购买专享版消息服务for Kafka集群
开通消息服务 for Kafka服务后,在控制台页面点击创建集群,即可进行购买,详情可参考创建集群。

2. 为购买的集群创建主题
在控制台页面点击集群名称,进入集群详情页面。
在左侧的边栏中点击主题管理,进入主题管理页面。

在主题管理页面点击创建主题,进行主题的创建,详情参考创建主题。
接入步骤
步骤一:获取集群接入点
具体请参考:查看集群接入点。
步骤二:添加Maven配置
                XML
                
            
            1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5    <modelVersion>4.0.0</modelVersion>
6
7    <groupId>org.example</groupId>
8    <artifactId>flink_sdk</artifactId>
9    <version>1.0-SNAPSHOT</version>
10
11    <properties>
12        <maven.compiler.source>1.8</maven.compiler.source>
13        <maven.compiler.target>1.8</maven.compiler.target>
14        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15    </properties>
16
17    <dependencies>
18        <dependency>
19            <groupId>org.apache.kafka</groupId>
20            <artifactId>kafka-clients</artifactId>
21            <version>0.10.2.2</version>
22        </dependency>
23        <dependency>
24            <groupId>org.slf4j</groupId>
25            <artifactId>slf4j-simple</artifactId>
26            <version>1.7.25</version>
27            <scope>compile</scope>
28        </dependency>
29        <dependency>
30            <groupId>org.apache.flink</groupId>
31            <artifactId>flink-java</artifactId>
32            <version>1.6.1</version>
33        </dependency>
34        <dependency>
35            <groupId>org.apache.flink</groupId>
36            <artifactId>flink-streaming-java_2.11</artifactId>
37            <version>1.6.1</version>
38        </dependency>
39        <dependency>
40            <groupId>org.apache.flink</groupId>
41            <artifactId>flink-connector-kafka_2.11</artifactId>
42            <version>1.7.0</version>
43        </dependency>
44    </dependencies>
45
46    <build>
47        <plugins>
48            <plugin>
49                <groupId>org.apache.maven.plugins</groupId>
50                <artifactId>maven-shade-plugin</artifactId>
51                <version>3.2.4</version>
52                <executions>
53                    <execution>
54                        <phase>package</phase>
55                        <goals>
56                            <goal>shade</goal>
57                        </goals>
58                        <configuration>
59                            <transformers>
60                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
61                                    <mainClass>org.example.KafkaConsumerDemo</mainClass>
62                                </transformer>
63                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
64                                    <resource>reference.conf</resource>
65                                </transformer>
66                            </transformers>
67                        </configuration>
68                    </execution>
69                </executions>
70            </plugin>
71        </plugins>
72    </build>
73
74</project>
            步骤三:编写测试代码
- 需要关注并自行修改的参数
 
| 参数名 | 含义 | 
|---|---|
| access_point | 接入点信息 | 
| topic | 主题名称 | 
| value | 消息的具体内容 | 
| group_id | 消费组id | 
生产者示例代码
创建KafkaProducerDemo.java文件,具体代码示例如下:
                Java
                
            
            1package org.example;
2
3import org.apache.kafka.clients.producer.KafkaProducer;
4import org.apache.kafka.clients.producer.ProducerConfig;
5import org.apache.kafka.clients.producer.ProducerRecord;
6import org.apache.kafka.clients.producer.RecordMetadata;
7import org.apache.kafka.common.serialization.StringSerializer;
8
9import java.util.Properties;
10
11public class KafkaProducerDemo {
12    public static void main(String args[]) throws Exception {
13
14        //接入点设置
15        String access_point = "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092";
16        //填写需要发送消息的主题名称
17        String topic = "topic_name";
18        //填写需要发送消息
19        String value = "test flink message";
20        // 创建配置类
21        Properties props = new Properties();;
22        // 指定kafka服务端所在位置
23        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, access_point);
24        // Kafka消息的序列化方式。
25        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
26        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
27        // 请求的最长等待时间。
28        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
29        // 设置客户端内部重试次数。
30        props.put(ProducerConfig.RETRIES_CONFIG, 5);
31        // 设置客户端内部重试间隔。
32        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
33        // 构造 Producer 对象
34        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
35
36        try {
37            // 测试发送 100 条消息
38            for (int i = 0; i < 100; i++) {
39                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<>(topic, value + ": " + i);
40                kafkaProducer.send(kafkaMessage, (RecordMetadata recordMetadata, Exception e) -> {
41                    if (e == null) {
42                        System.out.println("send success:" + recordMetadata.toString());
43                    } else {
44                        e.printStackTrace();
45                        System.err.println("send fail");
46                    }
47                });
48            }
49        } catch (Exception e) {
50            System.out.println("Something error has happened");
51            e.printStackTrace();
52        } finally {
53            kafkaProducer.close();
54        }
55    }
56}
            消费者实例
创建KafkaConsumerDemo.java文件,具体代码示例如下
                Java
                
            
            1package org.example;
2
3import org.apache.flink.api.common.serialization.SimpleStringSchema;
4import org.apache.flink.streaming.api.datastream.DataStream;
5import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
6import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
7
8import java.util.Properties;
9
10
11public class KafkaConsumerDemo {
12    public static void main(String args[]) throws Exception {
13
14        //接入点设置
15        String access_point = "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092";
16        String group_id = "flink_group";
17        
18        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
19        // 创建配置类
20        Properties properties = new Properties();
21        // 设置接入点
22        properties.setProperty("bootstrap.servers", access_point);
23        // 设置消费组id
24        properties.setProperty("group.id", group_id);
25        // FlinkKafkaConsumer的第一个参数填创建的topic名称
26        DataStream<String> stream = env
27                .addSource(new FlinkKafkaConsumer<>("topic_name", new SimpleStringSchema(), properties));
28        // 打印输出
29        stream.print();
30        env.execute();
31    }
32}
            步骤四:编译并运行
通过maven工具将上述代码打包后,上传至指定的服务器,并执行以下命令:
                Shell
                
            
            1java -jar flink_sdk-1.0-SNAPSHOT.jar
            步骤五:查看集群监控
查看消息是否发送成功或消费成功有两种方式:
- 在服务器端查看jar包运行日志。
 - 在专享版消息服务 for Kafka控制台查看集群监控,获取集群生产、消息情况。
 
推荐使用第二种方式,下面介绍如何查看集群监控。
(1)在专享版消息服务 for Kafka的控制台页面找到需要连接的集群,点击集群名称进入集群详情页面。

(2)页面跳转后,进入左侧边中的集群详情页面。

(3)点击左侧边栏中的集群监控,进入集群监控页面。

(4)通过查看集群监控页面,提供的不同纬度的监控信息(集群监控、节点监控、主题监控、消费组监控),即可获知集群的生产和消费情况。
集群监控的具体使用请参考:集群监控

