Flink接入Kafka专享版
更新时间:2023-07-05
接入准备
1. 购买专享版消息服务for Kafka集群
开通消息服务 for Kafka服务后,在控制台页面点击创建集群,即可进行购买,详情可参考创建集群。
2. 为购买的集群创建主题
在控制台页面点击集群名称,进入集群详情页面。
在左侧的边栏中点击主题管理,进入主题管理页面。
在主题管理页面点击创建主题,进行主题的创建,详情参考创建主题。
接入步骤
步骤一:获取集群接入点
具体请参考:查看集群接入点。
步骤二:添加Maven配置
XML
1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6
7 <groupId>org.example</groupId>
8 <artifactId>flink_sdk</artifactId>
9 <version>1.0-SNAPSHOT</version>
10
11 <properties>
12 <maven.compiler.source>1.8</maven.compiler.source>
13 <maven.compiler.target>1.8</maven.compiler.target>
14 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15 </properties>
16
17 <dependencies>
18 <dependency>
19 <groupId>org.apache.kafka</groupId>
20 <artifactId>kafka-clients</artifactId>
21 <version>0.10.2.2</version>
22 </dependency>
23 <dependency>
24 <groupId>org.slf4j</groupId>
25 <artifactId>slf4j-simple</artifactId>
26 <version>1.7.25</version>
27 <scope>compile</scope>
28 </dependency>
29 <dependency>
30 <groupId>org.apache.flink</groupId>
31 <artifactId>flink-java</artifactId>
32 <version>1.6.1</version>
33 </dependency>
34 <dependency>
35 <groupId>org.apache.flink</groupId>
36 <artifactId>flink-streaming-java_2.11</artifactId>
37 <version>1.6.1</version>
38 </dependency>
39 <dependency>
40 <groupId>org.apache.flink</groupId>
41 <artifactId>flink-connector-kafka_2.11</artifactId>
42 <version>1.7.0</version>
43 </dependency>
44 </dependencies>
45
46 <build>
47 <plugins>
48 <plugin>
49 <groupId>org.apache.maven.plugins</groupId>
50 <artifactId>maven-shade-plugin</artifactId>
51 <version>3.2.4</version>
52 <executions>
53 <execution>
54 <phase>package</phase>
55 <goals>
56 <goal>shade</goal>
57 </goals>
58 <configuration>
59 <transformers>
60 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
61 <mainClass>org.example.KafkaConsumerDemo</mainClass>
62 </transformer>
63 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
64 <resource>reference.conf</resource>
65 </transformer>
66 </transformers>
67 </configuration>
68 </execution>
69 </executions>
70 </plugin>
71 </plugins>
72 </build>
73
74</project>
步骤三:编写测试代码
- 需要关注并自行修改的参数
参数名 | 含义 |
---|---|
access_point | 接入点信息 |
topic | 主题名称 |
value | 消息的具体内容 |
group_id | 消费组id |
生产者示例代码
创建KafkaProducerDemo.java文件,具体代码示例如下:
Java
1package org.example;
2
3import org.apache.kafka.clients.producer.KafkaProducer;
4import org.apache.kafka.clients.producer.ProducerConfig;
5import org.apache.kafka.clients.producer.ProducerRecord;
6import org.apache.kafka.clients.producer.RecordMetadata;
7import org.apache.kafka.common.serialization.StringSerializer;
8
9import java.util.Properties;
10
11public class KafkaProducerDemo {
12 public static void main(String args[]) throws Exception {
13
14 //接入点设置
15 String access_point = "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092";
16 //填写需要发送消息的主题名称
17 String topic = "topic_name";
18 //填写需要发送消息
19 String value = "test flink message";
20 // 创建配置类
21 Properties props = new Properties();;
22 // 指定kafka服务端所在位置
23 props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, access_point);
24 // Kafka消息的序列化方式。
25 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
26 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
27 // 请求的最长等待时间。
28 props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
29 // 设置客户端内部重试次数。
30 props.put(ProducerConfig.RETRIES_CONFIG, 5);
31 // 设置客户端内部重试间隔。
32 props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
33 // 构造 Producer 对象
34 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
35
36 try {
37 // 测试发送 100 条消息
38 for (int i = 0; i < 100; i++) {
39 ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic, value + ": " + i);
40 kafkaProducer.send(kafkaMessage, (RecordMetadata recordMetadata, Exception e) -> {
41 if (e == null) {
42 System.out.println("send success:" + recordMetadata.toString());
43 } else {
44 e.printStackTrace();
45 System.err.println("send fail");
46 }
47 });
48 }
49 } catch (Exception e) {
50 System.out.println("Something error has happened");
51 e.printStackTrace();
52 } finally {
53 kafkaProducer.close();
54 }
55 }
56}
消费者实例
创建KafkaConsumerDemo.java文件,具体代码示例如下
Java
1package org.example;
2
3import org.apache.flink.api.common.serialization.SimpleStringSchema;
4import org.apache.flink.streaming.api.datastream.DataStream;
5import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
6import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
7
8import java.util.Properties;
9
10
11public class KafkaConsumerDemo {
12 public static void main(String args[]) throws Exception {
13
14 //接入点设置
15 String access_point = "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092";
16 String group_id = "flink_group";
17
18 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
19 // 创建配置类
20 Properties properties = new Properties();
21 // 设置接入点
22 properties.setProperty("bootstrap.servers", access_point);
23 // 设置消费组id
24 properties.setProperty("group.id", group_id);
25 // FlinkKafkaConsumer的第一个参数填创建的topic名称
26 DataStream<String> stream = env
27 .addSource(new FlinkKafkaConsumer<>("topic_name", new SimpleStringSchema(), properties));
28 // 打印输出
29 stream.print();
30 env.execute();
31 }
32}
步骤四:编译并运行
通过maven工具将上述代码打包后,上传至指定的服务器,并执行以下命令:
Shell
1java -jar flink_sdk-1.0-SNAPSHOT.jar
步骤五:查看集群监控
查看消息是否发送成功或消费成功有两种方式:
- 在服务器端查看jar包运行日志。
- 在专享版消息服务 for Kafka控制台查看集群监控,获取集群生产、消息情况。
推荐使用第二种方式,下面介绍如何查看集群监控。
(1)在专享版消息服务 for Kafka的控制台页面找到需要连接的集群,点击集群名称进入集群详情页面。
(2)页面跳转后,进入左侧边中的集群详情页面。
(3)点击左侧边栏中的集群监控,进入集群监控页面。
(4)通过查看集群监控页面,提供的不同纬度的监控信息(集群监控、节点监控、主题监控、消费组监控),即可获知集群的生产和消费情况。
集群监控的具体使用请参考:集群监控