KAFKA
更新时间:2023-12-07
KAFKA DDL
FLINK
1CREATE TABLE kafka_table (
2 `field01` STRING,
3 `field02` BIGINT,
4 `field03` FLOAT,
5 `field04` BINARY,
6 `field05` INT,
7 `field06` TINYINT,
8 `field07` BOOLEAN,
9 `field08` DATA,
10 `field09` DOUBLE,
11 `field10` SMALLINT
12) WITH (
13 'connector.type' = 'KAFKA',
14 'format.encode' = 'JSON',
15 'connector.topic' = 'topic_name',
16 'connector.properties.bootstrap.servers' = 'localhost:9092',
17 'connector.properties.group.id' = 'test_group',
18 'connector.properties.ssl.filename' = 'kafka-key.zip',
19 'connector.read.startup.mode' = 'specific',
20 'connector.read.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
21);
KAFKA 参数设置
名称 | 简称 | 是否必填 | 用例 | SPARK | FLINK | 说明 |
---|---|---|---|---|---|---|
connector.type | type | Y | KAFKA |
Y | Y | 服务类型 |
format.encode | encode | Y | JSON / CSV |
Y | Y | 数据编码 |
connector.version | version | 0.11 |
KAFKA 集群版本 | |||
connector.topic | topic | Y | topicName |
Y | Y | KAFKA TOPIC名称 |
connector.properties.bootstrap.servers | bootstrap.servers | Y | localhost:9092 |
Y | Y | KAFKA SERVER地址 |
connector.properties.group.id | group.id | test_group |
Y | KAFKA GROUP ID | ||
connector.properties.security.protocol | security.protocol | Y | SASL_PLAINTEXT /SASL_SSL /PLAINTEXT /SSL |
Y | Y | KAFKA协议 |
connector.properties.ssl.filename | ssl.filename | kafka-key.zip |
Y | Y | SSL协议的时候,指定KAFKA 证书名称 | |
connector.properties.sasl.mechanism | sasl.mechanism | SCRAM-SHA-512 |
Y | Y | SASL协议的时候,指定sasl mechanism | |
connector.properties.sasl.jaas.config | sasl.jaas.config | 例如org.apache.kafka.common.security.scram.ScramLoginModule sufficient username="admin" password="password"; |
Y | Y | SASL协议的时候,指定jaas config文件内容 | |
connector.read.startup.mode | startup.mode | earliest / latest / specific |
Y | KAFKA 启动模式 | ||
connector.read.startup.specific-offsets | startup.specific-offsets | partition:0,offset:42;partition:1,offset:300 |
Y | 指定 KAFKA 启动的 offset | ||
connector.read.starting-offsets | starting-offsets | earliest / latest / {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} |
Y | 指定 kafka 的启动位置 | ||
connector.read.ending-offsets | ending-offsets | {"topic1":{"0":50,"1":-1},"topic2":{"0":-1}} |
Y | 指定 kafka 的结束位置 | ||
connector.read.fail-on-data-loss | fail-on-data-loss | true / false |
Y | |||
connector.read.max-offsets-per-trigger | max-offsets-per-trigger | 1000 |
Y |