VPC网络PLAINTEXT方式生产和消费
更新时间:2024-06-18
在同 VPC 网络下访问,使用 PLAINTEXT 协议接入,接入点可以在 集群详情 页面查看。
环境准备
- 安装GCC
- 安装C++ 依赖库。
Shell
1yum install librdkafka-devel
2yum install cyrus-sasl
3yum install cyrus-sasl-scram
集群准备
1. 购买专享版消息服务for Kafka集群
开通消息服务 for Kafka服务后,在控制台页面点击『创建集群』,即可进行购买。
2. 为购买的集群创建主题
在控制台页面点击集群名称,进入集群详情页面。
在左侧的边栏中点击『主题管理』,进入主题管理页面。
在主题管理页面点击『创建主题』,进行主题的创建。
使用步骤:
步骤一:获取集群接入点
具体请参考:接入点查看。
步骤二:编写测试代码
生产者代码示例
创建KafkaProducerDemo.c文件,具体代码示例如下:
Shell
1/*
2* librdkafka - Apache Kafka C library
3*
4* Copyright (c) 2017, Magnus Edenhill
5* All rights reserved.
6*
7* Redistribution and use in source and binary forms, with or without
8* modification, are permitted provided that the following conditions are met:
9*
10* 1. Redistributions of source code must retain the above copyright notice,
11* this list of conditions and the following disclaimer.
12* 2. Redistributions in binary form must reproduce the above copyright notice,
13* this list of conditions and the following disclaimer in the documentation
14* and/or other materials provided with the distribution.
15*
16* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26* POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/**
30* Simple Apache Kafka producer
31* using the Kafka driver from librdkafka
32* (https://github.com/edenhill/librdkafka)
33 */
34
35#include <stdio.h>
36#include <signal.h>
37#include <string.h>
38
39
40/* Typical include path would be <librdkafka/rdkafka.h>, but this program
41* is builtin from within the librdkafka source tree and thus differs. */
42 #include "librdkafka/rdkafka.h"
43
44
45static volatile sig_atomic_t run = 1;
46
47/**
48* @brief Signal termination of program
49 */
50 static void stop(int sig) {
51 run = 0;
52 fclose(stdin); /* abort fgets() */
53 }
54
55
56/**
57* @brief Message delivery report callback.
58*
59* This callback is called exactly once per message, indicating if
60* the message was succesfully delivered
61* (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently
62* failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).
63*
64* The callback is triggered from rd_kafka_poll() and executes on
65* the application's thread.
66 */
67 static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
68 if (rkmessage->err) {
69 fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err));
70 } else {
71 fprintf(stderr, "%% Message delivered (%zd bytes, partition %" PRId32 ")\n", rkmessage->len, rkmessage->partition);
72 }
73
74 /* The rkmessage is destroyed automatically by librdkafka */
75 }
76
77
78
79int main(int argc, char **argv) {
80rd_kafka_t *rk; /* Producer instance handle */
81rd_kafka_conf_t *conf; /* Temporary configuration object */
82
83 char errstr[512]; /* librdkafka API error reporting buffer */
84 char buf[512]; /* Message value temporary buffer */
85
86 const char *brokers; /* Argument: broker list */
87 const char *topic; /* Argument: topic to produce to */
88 // const char *username; /* Argument: sasl username */
89 // const char *password; /* Argument: sasl password */
90
91 /*
92 * Argument validation
93 */
94
95 // 检查参数配置
96 if (argc != 3) {
97 fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
98 return 1;
99 }
100
101 // 接入点信息
102 brokers = argv[1];
103 // 主题名称
104 topic = argv[2];
105
106
107 /*
108 * Create Kafka client configuration place-holder
109 */
110 conf = rd_kafka_conf_new();
111
112 /* Set bootstrap broker(s) as a comma-separated list of
113 * host or host:port (default port 9092).
114 * librdkafka will use the bootstrap brokers to acquire the full
115 * set of brokers from the cluster. */
116 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
117 fprintf(stderr, "%s\n", errstr);
118 return 1;
119 }
120
121 if (
122 rd_kafka_conf_set(conf, "security.protocol", "plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
123 ) {
124 fprintf(stderr, "%s\n", errstr);
125 return -1;
126 }
127
128 /* Set the delivery report callback.
129 * This callback will be called once per message to inform
130 * the application if delivery succeeded or failed.
131 * See dr_msg_cb() above.
132 * The callback is only triggered from rd_kafka_poll() and
133 * rd_kafka_flush(). */
134 rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
135
136 /*
137 * Create producer instance.
138 *
139 * NOTE: rd_kafka_new() takes ownership of the conf object
140 * and the application must not reference it again after
141 * this call.
142 */
143
144 rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
145 if (!rk) {
146 fprintf(stderr, "%% Failed to create new producer: %s\n", errstr);
147 return 1;
148 }
149
150 /* Signal handler for clean shutdown */
151 signal(SIGINT, stop);
152
153 fprintf(stderr,
154 "%% Type some text and hit enter to produce message\n"
155 "%% Or just hit enter to only serve delivery reports\n"
156 "%% Press Ctrl-C or Ctrl-D to exit\n");
157
158 while (run && fgets(buf, sizeof(buf), stdin)) {
159 size_t len = strlen(buf);
160 rd_kafka_resp_err_t err;
161
162 /* Remove newline */
163 if (buf[len - 1] == '\n') {
164 buf[--len] = '\0';
165 }
166
167 /* Empty line: only serve delivery reports */
168 if (len == 0) {
169 rd_kafka_poll(rk, 0/*non-blocking */);
170 continue;
171 }
172
173 /*
174 * Send/Produce message.
175 * This is an asynchronous call, on success it will only
176 * enqueue the message on the internal producer queue.
177 * The actual delivery attempts to the broker are handled
178 * by background threads.
179 * The previously registered delivery report callback
180 * (dr_msg_cb) is used to signal back to the application
181 * when the message has been delivered (or failed).
182 */
183 retry:
184 err = rd_kafka_producev(
185 /* Producer handle */
186 rk,
187 /* Topic name */
188 RD_KAFKA_V_TOPIC(topic),
189 /* Make a copy of the payload. */
190 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
191 /* Message value and length */
192 RD_KAFKA_V_VALUE(buf, len),
193 /* Per-Message opaque, provided in
194 * delivery report callback as
195 * msg_opaque. */
196 RD_KAFKA_V_OPAQUE(NULL),
197 /* End sentinel */
198 RD_KAFKA_V_END
199 );
200
201 if (err) {
202 /*
203 * Failed to *enqueue* message for producing.
204 */
205 fprintf(stderr,
206 "%% Failed to produce to topic %s: %s\n", topic,
207 rd_kafka_err2str(err));
208
209 if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
210 /* If the internal queue is full, wait for
211 * messages to be delivered and then retry.
212 * The internal queue represents both
213 * messages to be sent and messages that have
214 * been sent or failed, awaiting their
215 * delivery report callback to be called.
216 *
217 * The internal queue is limited by the
218 * configuration property
219 * queue.buffering.max.messages */
220 rd_kafka_poll(rk, 1000 /*block for max 1000ms*/);
221 goto retry;
222 }
223 } else {
224 fprintf(stderr, "%% Enqueued message (%zd bytes) "
225 "for topic %s\n",
226 len, topic);
227 }
228
229
230 /* A producer application should continually serve
231 * the delivery report queue by calling rd_kafka_poll()
232 * at frequent intervals.
233 * Either put the poll call in your main loop, or in a
234 * dedicated thread, or call it after every
235 * rd_kafka_produce() call.
236 * Just make sure that rd_kafka_poll() is still called
237 * during periods where you are not producing any messages
238 * to make sure previously produced messages have their
239 * delivery report callback served (and any other callbacks
240 * you register). */
241 rd_kafka_poll(rk, 0 /*non-blocking*/);
242 }
243
244
245 /* Wait for final messages to be delivered or fail.
246 * rd_kafka_flush() is an abstraction over rd_kafka_poll() which
247 * waits for all messages to be delivered. */
248 fprintf(stderr, "%% Flushing final messages..\n");
249 rd_kafka_flush(rk, 10 * 1000 /* wait for max 10 seconds */);
250
251 /* If the output queue is still not empty there is an issue
252 * with producing messages to the clusters. */
253 if (rd_kafka_outq_len(rk) > 0) {
254 fprintf(stderr, "%% %d message(s) were not delivered\n", rd_kafka_outq_len(rk));
255 }
256
257 /* Destroy the producer instance */
258 rd_kafka_destroy(rk);
259
260 return 0;
261}
消费者代码示例
创建KafkaConsumerDemo.c文件,具体代码示例如下:
Shell
1/*
2* librdkafka - Apache Kafka C library
3*
4* Copyright (c) 2019, Magnus Edenhill
5* All rights reserved.
6*
7* Redistribution and use in source and binary forms, with or without
8* modification, are permitted provided that the following conditions are met:
9*
10* 1. Redistributions of source code must retain the above copyright notice,
11* this list of conditions and the following disclaimer.
12* 2. Redistributions in binary form must reproduce the above copyright notice,
13* this list of conditions and the following disclaimer in the documentation
14* and/or other materials provided with the distribution.
15*
16* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26* POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/**
30* Simple high-level balanced Apache Kafka consumer
31* using the Kafka driver from librdkafka
32* (https://github.com/edenhill/librdkafka)
33 */
34
35#include <stdio.h>
36#include <signal.h>
37#include <string.h>
38#include <ctype.h>
39
40
41/* Typical include path would be <librdkafka/rdkafka.h>, but this program
42* is builtin from within the librdkafka source tree and thus differs. */
43 //#include <librdkafka/rdkafka.h>
44 #include "librdkafka/rdkafka.h"
45
46
47static volatile sig_atomic_t run = 1;
48
49/**
50* @brief Signal termination of program
51 */
52 static void stop(int sig) {
53 run = 0;
54 }
55
56
57
58/**
59* @returns 1 if all bytes are printable, else 0.
60 */
61 static int is_printable(const char *buf, size_t size) {
62 size_t i;
63
64 for (i = 0; i < size; i++) {
65 if (!isprint((int)buf[i])) {
66 return 0;
67 }
68 }
69
70 return 1;
71 }
72
73
74int main(int argc, char **argv) {
75rd_kafka_t *rk; /* Consumer instance handle */
76rd_kafka_conf_t *conf; /* Temporary configuration object */
77rd_kafka_resp_err_t err; /* librdkafka API error code */
78
79 char errstr[512]; /* librdkafka API error reporting buffer */
80
81 const char *brokers; /* Argument: broker list */
82 const char *groupid; /* Argument: Consumer group id */
83 char **topics; /* Argument: list of topics to subscribe to */
84
85 int topic_cnt; /* Number of topics to subscribe to */
86 rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
87 int i;
88
89 /*
90 * Argument validation
91 */
92
93 // 检查参数配置
94 if (argc < 4) {
95 fprintf(stderr, "%% Usage: %s <broker> <group.id> <username> <password> <topic1> <topic2>..\n", argv[0]);
96 return 1;
97 }
98
99 // 接入点信息
100 brokers = argv[1];
101 // 消费组 id
102 groupid = argv[2];
103 // 主题名称
104 topics = &argv[3];
105
106 topic_cnt = argc - 3;
107
108
109 /*
110 * Create Kafka client configuration place-holder
111 */
112 conf = rd_kafka_conf_new();
113
114 /* Set bootstrap broker(s) as a comma-separated list of
115 * host or host:port (default port 9092).
116 * librdkafka will use the bootstrap brokers to acquire the full
117 * set of brokers from the cluster. */
118 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
119 fprintf(stderr, "%s\n", errstr);
120 rd_kafka_conf_destroy(conf);
121 return 1;
122 }
123
124 if (
125 rd_kafka_conf_set(conf, "security.protocol", "plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
126 ) {
127 fprintf(stderr, "%s\n", errstr);
128 return -1;
129 }
130
131 /* Set the consumer group id.
132 * All consumers sharing the same group id will join the same
133 * group, and the subscribed topic' partitions will be assigned
134 * according to the partition.assignment.strategy
135 * (consumer config property) to the consumers in the group. */
136 if (rd_kafka_conf_set(conf, "group.id", groupid, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
137 fprintf(stderr, "%s\n", errstr);
138 rd_kafka_conf_destroy(conf);
139 return 1;
140 }
141
142 /* If there is no previously committed offset for a partition
143 * the auto.offset.reset strategy will be used to decide where
144 * in the partition to start fetching messages.
145 * By setting this to earliest the consumer will read all messages
146 * in the partition if there was no previously committed offset. */
147 if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
148 fprintf(stderr, "%s\n", errstr);
149 rd_kafka_conf_destroy(conf);
150 return 1;
151 }
152
153 /*
154 * Create consumer instance.
155 *
156 * NOTE: rd_kafka_new() takes ownership of the conf object
157 * and the application must not reference it again after
158 * this call.
159 */
160 rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
161 if (!rk) {
162 fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
163 return 1;
164 }
165
166 conf = NULL; /* Configuration object is now owned, and freed,
167 * by the rd_kafka_t instance. */
168
169
170 /* Redirect all messages from per-partition queues to
171 * the main queue so that messages can be consumed with one
172 * call from all assigned partitions.
173 *
174 * The alternative is to poll the main queue (for events)
175 * and each partition queue separately, which requires setting
176 * up a rebalance callback and keeping track of the assignment:
177 * but that is more complex and typically not recommended. */
178 rd_kafka_poll_set_consumer(rk);
179
180
181 /* Convert the list of topics to a format suitable for librdkafka */
182 subscription = rd_kafka_topic_partition_list_new(topic_cnt);
183 for (i = 0; i < topic_cnt; i++) {
184 rd_kafka_topic_partition_list_add(
185 subscription,
186 topics[i],
187 /* the partition is ignored
188 * by subscribe() */
189 RD_KAFKA_PARTITION_UA
190 );
191 }
192
193 /* Subscribe to the list of topics */
194 err = rd_kafka_subscribe(rk, subscription);
195 if (err) {
196 fprintf(stderr, "%% Failed to subscribe to %d topics: %s\n", subscription->cnt, rd_kafka_err2str(err));
197 rd_kafka_topic_partition_list_destroy(subscription);
198 rd_kafka_destroy(rk);
199 return 1;
200 }
201
202 fprintf(stderr,
203 "%% Subscribed to %d topic(s), "
204 "waiting for rebalance and messages...\n",
205 subscription->cnt);
206
207 rd_kafka_topic_partition_list_destroy(subscription);
208
209
210 /* Signal handler for clean shutdown */
211 signal(SIGINT, stop);
212
213 /* Subscribing to topics will trigger a group rebalance
214 * which may take some time to finish, but there is no need
215 * for the application to handle this idle period in a special way
216 * since a rebalance may happen at any time.
217 * Start polling for messages. */
218
219 while (run) {
220 rd_kafka_message_t *rkm;
221
222 rkm = rd_kafka_consumer_poll(rk, 100);
223 if (!rkm) {
224 continue; /* Timeout: no message within 100ms,
225 * try again. This short timeout allows
226 * checking for `run` at frequent intervals.
227 */
228 }
229
230 /* consumer_poll() will return either a proper message
231 * or a consumer error (rkm->err is set). */
232 if (rkm->err) {
233 /* Consumer errors are generally to be considered
234 * informational as the consumer will automatically
235 * try to recover from all types of errors. */
236 fprintf(stderr, "%% Consumer error: %s\n", rd_kafka_message_errstr(rkm));
237 rd_kafka_message_destroy(rkm);
238 continue;
239 }
240
241 /* Proper message. */
242 printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",
243 rd_kafka_topic_name(rkm->rkt), rkm->partition,
244 rkm->offset);
245
246 /* Print the message key. */
247 if (rkm->key && is_printable(rkm->key, rkm->key_len)) {
248 printf(" Key: %.*s\n", (int)rkm->key_len, (const char *)rkm->key);
249 } else if (rkm->key) {
250 printf(" Key: (%d bytes)\n", (int)rkm->key_len);
251 }
252
253 /* Print the message value/payload. */
254 if (rkm->payload && is_printable(rkm->payload, rkm->len)) {
255 printf(" Value: %.*s\n", (int)rkm->len, (const char *)rkm->payload);
256 } else if (rkm->payload) {
257 printf(" Value: (%d bytes)\n", (int)rkm->len);
258 }
259
260 rd_kafka_message_destroy(rkm);
261 }
262
263
264 /* Close the consumer: commit final offsets and leave the group. */
265 fprintf(stderr, "%% Closing consumer\n");
266 rd_kafka_consumer_close(rk);
267
268
269 /* Destroy the consumer */
270 rd_kafka_destroy(rk);
271
272 return 0;
273}
步骤三:编译并运行
编译并运行上述两个代码文件。
Bash
1# 启动消费者
2gcc -lrdkafka ./consumer.c -o consumer
3./consumer <broker> <group.id> <topic1> <topic2>..
4# 启动生产者
5gcc -lrdkafka ./producer.c -o producer
6./producer <broker> <topic>
步骤四:查看集群监控
查看消息是否发送成功或消费成功有两种方式:
- 在服务器端/控制台查看日志。
- 在专享版消息服务 for Kafka控制台查看集群监控,获取集群生产、消息情况。
推荐使用第二种方式,下面介绍如何查看集群监控。
(1)在专享版消息服务 for Kafka的控制台页面找到需要连接的集群,点击集群名称进入『集群详情』页面。
(2)页面跳转后,进入左侧边中的『集群详情』页面。
(3)点击左侧边栏中的『集群监控』,进入『集群监控』页面。
(4)通过查看『集群监控』页面,提供的不同纬度的监控信息(集群监控、节点监控、主题监控、消费组监控),即可获知集群的生产和消费情况。
集群监控的具体使用请参考:集群监控