从Flink导入
更新时间:2025-01-20
使用Flink ClickHouse 连接器进行导入。
表一 连接器选择
选项 | 默认 | 类型 | 描述 | |
---|---|---|---|---|
网址 | 必填 | none | String | 格式的 ClickHouse jdbc url clickhouse://<host>:<port> |
用户名 | 选填 | none | String | 如果指定了“用户名”和“密码”,则必须同时指定。 |
密码 | 选填 | none | String | ClickHouse 密码。 |
数据库名称 | 选填 | default | String | ClickHouse 数据库名称。 |
表名 | 必填 | none | String | ClickHouse 表名。 |
使用本地 | 选填 | false | Boolean | 在分布式表引擎的情况下直接读取/写入本地表。 |
sink.flush 间隔 | 选填 | 1000 | Integer | 最大刷新大小,超过此大小将会刷新数据。 |
sink.flush 间隔 | 选填 | 1s | Duration | 在此刷新间隔时间内,异步线程将刷新数据。 |
sink.max-重试次数 | 选填 | 3 | Integer | 将记录写入数据库失败时的最大重试次数。 |
sink.update 策略 | 选填 | update | String | 将 UPDATE_AFTER 类型的记录转换为更新/插入语句或者直接丢弃它,可用:更新、插入、丢弃。 |
sink.partition-策略 | 选填 | balanced | String | 分区策略:平衡(循环)、哈希(分区键)、随机(随机)。 |
接收器.分区键 | 选填 | none | String | 用于哈希策略的分区键。 |
接收器分片使用表定义 | 选填 | false | Boolean | 分片策略与分布式表定义一致,若设置为true,则会覆盖sink.partition-strategy 和sink.partition-key 的配置。 |
sink.ignore-删除 | 选填 | true | Integer | 是否忽略删除语句。 |
接收器并行性 | 选填 | none | String | 为接收器定义自定义并行性。 |
扫描.分区.列 | 选填 | none | Integer | 用于对输入进行分区的列名。 |
扫描分区号 | 选填 | none | Long | 分区的数量。 |
扫描分区下限 | 选填 | none | Long | 第一个分区的最小值。 |
是否忽略主键 | 选填 | true | Boolean | 使用 ClickHouseCatalog 创建表时是否忽略主键。 |
特性 | 选填 | none | String | 这可以设置并传递clickhouse-jdbc 配置。 |
查找缓存 | 选填 | none | String | 该查询表的缓存策略,包括NONE和PARTIAL(暂不支持FULL) |
查找部分缓存访问后过期 | 选填 | none | Duration | 访问后缓存中的条目过期的持续时间,超过此时间,最旧的行将会过期。 |
查找部分缓存写入后过期 | 选填 | none | Duration | 写入后缓存中的条目过期的持续时间,超过此时间,最旧的行将会过期。 |
查找缓存最大行数 | 选填 | none | Long | 查找缓存的最大行数,超过此值,最旧的行将会过期。 |
查找部分缓存缺少键 | 选填 | true | Boolean | 标记缓存丢失的密钥,默认为 true |
查找最大重试数 | 选填 | 3 | Integer | 查找数据库失败时的最大重试次数。 |
更新/删除数据注意事项:
- 分布式表不支持更新/删除语句,如果要使用更新/删除报表,请确保将记录写入本地表或将use local设置为true。
- 数据由主键更新和删除,在分区表中使用时请注意这一点。
数据类型映射
Flink 类型 | ClickHouse 类型 |
---|---|
CHAR | String |
VARCHAR | String / IP / UUID |
STRING | String / Enum |
BOOLEAN | UInt8 |
BYTES | FixedString |
DECIMAL | Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256 |
TINYINT | Int8 |
SMALLINT | Int16 / UInt8 |
INTEGER | Int32 / UInt16 / Interval |
BIGINT | Int64 / UInt32 |
FLOAT | Float32 |
DOUBLE | Float64 |
DATE | Date |
TIME | DateTime |
TIMESTAMP | DateTime |
TIMESTAMP_LTZ | DateTime |
INTERVAL_YEAR_MONTH | Int32 |
INTERVAL_DAY_TIME | Int64 |
ARRAY | Array |
MAP | Map |
ROW | Not supported |
MULTISET | Not supported |
RAW | Not supported |
Maven依赖关系
该项目未发布到maven中央存储库,在使用之前,需要部署/安装到自己的存储库,步骤如下:
Plain Text
1 # clone the project
2git clone https://github.com/itinycheng/flink-connector-clickhouse.git
3
4 # enter the project directory
5cd flink-connector-clickhouse/
6
7 # display remote branches
8git branch -r
9
10 # checkout the branch you need
11git checkout $branch_name
12
13 # install or deploy the project to our own repository
14mvn clean install -DskipTests
15mvn clean deploy -DskipTests
Plain Text
1<dependency>
2 <groupId>org.apache.flink</groupId>
3 <artifactId>flink-connector-clickhouse</artifactId>
4 <version>1.16.0-SNAPSHOT</version>
5</dependency>
使用步骤
- 创建和读/写表:
Plain Text
1-- register a clickhouse table `t_user` in flink sql.
2CREATE TABLE t_user (
3 `user_id` BIGINT,
4 `user_type` INTEGER,
5 `language` STRING,
6 `country` STRING,
7 `gender` STRING,
8 `score` DOUBLE,
9 `list` ARRAY<STRING>,
10 `map` Map<STRING, BIGINT>,
11 PRIMARY KEY (`user_id`) NOT ENFORCED
12) WITH (
13 'connector' = 'clickhouse',
14 'url' = 'clickhouse://{ip}:{port}',
15 'database-name' = 'tutorial',
16 'table-name' = 'users',
17 'sink.batch-size' = '500',
18 'sink.flush-interval' = '1000',
19 'sink.max-retries' = '3'
20);
21
22-- read data from clickhouse
23SELECT user_id, user_type from t_user;
24
25-- write data into the clickhouse table from the table `T`
26INSERT INTO t_user
27SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`, ARRAY['CODER', 'SPORTSMAN'], CAST(MAP['BABA', cast(10 as BIGINT), 'NIO', cast(8 as BIGINT)] AS MAP<STRING, BIGINT>) FROM T;
- 创建和使用ClickHouseCatalog:
- Scala
Plain Text
1val tEnv = TableEnvironment.create(setting)
2
3val props = new util.HashMap[String, String]()
4props.put(ClickHouseConfig.DATABASE_NAME, "default")
5props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123")
6props.put(ClickHouseConfig.USERNAME, "username")
7props.put(ClickHouseConfig.PASSWORD, "password")
8props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s")
9val cHcatalog = new ClickHouseCatalog("clickhouse", props)
10tEnv.registerCatalog("clickhouse", cHcatalog)
11tEnv.useCatalog("clickhouse")
12
13tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");
- Java
Plain Text
1TableEnvironment tEnv = TableEnvironment.create(setting);
2
3Map<String, String> props = new HashMap<>();
4props.put(ClickHouseConfig.DATABASE_NAME, "default")
5props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123")
6props.put(ClickHouseConfig.USERNAME, "username")
7props.put(ClickHouseConfig.PASSWORD, "password")
8props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s");
9Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props);
10tEnv.registerCatalog("clickhouse", cHcatalog);
11tEnv.useCatalog("clickhouse");
12
13tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");
- SQL
Plain Text
1> CREATE CATALOG clickhouse WITH (
2 'type' = 'clickhouse',
3 'url' = 'clickhouse://127.0.0.1:8123',
4 'username' = 'username',
5 'password' = 'password',
6 'database-name' = 'default',
7 'use-local' = 'false',
8 ...
9);
10
11> USE CATALOG clickhouse;
12> SELECT user_id, user_type FROM `default`.`t_user` limit 10;
13> INSERT INTO `default`.`t_user` SELECT ...;