从Spark导入
更新时间:2025-01-20
将Apache Spark与ClickHouse集成
连接Apache Spark和ClickHouse有两种主要方式:
- Spark连接器-Spark连接器实现了DataSourceV2,并具有自己的目录管理。截至今天,这是集成ClickHouse和Spark的推荐方式。
- Spark JDBC-使用JDBC数据源集成Spark和ClickHouse。
Spark连接器
此连接器利用ClickHouse特定的优化,如高级分区和谓词下推,来提高查询性能和数据处理。该连接器基于ClickHouse的官方JDBC连接器,并管理自己的目录。
必要条件
- Java 8 or 17
- Scala 2.12 or 2.13
- Apache Spark 3.3 or 3.4 or 3.5
兼容性
版本 | 兼容的 Spark 版本 | ClickHouse JDBC 版本 |
---|---|---|
main | Spark 3.3, 3.4, 3.5 | 0.6.3 |
0.8.0 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
0.7.3 | Spark 3.3, 3.4 | 0.4.6 |
0.6.0 | Spark 3.3 | 0.3.2-patch11 |
0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 |
0.4.0 | Spark 3.2, 3.3 | Not depend on |
0.3.0 | Spark 3.2, 3.3 | Not depend on |
0.2.1 | Spark 3.2 | Not depend on |
0.1.2 | Spark 3.2 | Not depend on |
下载库
二进制JAR的名称模式是:
Plain Text
1clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar
您可以在Maven中央存储库中找到所有可用的已发布JAR,在Sonatype OSS快照存储库中可以找到所有每日构建的SNAPSHOT JAR。
作为依赖项导入
- Gradle
Plain Text
1dependencies {
2 implementation("com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}")
3 implementation("com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all") { transitive = false }
4}
如果要使用SNAPSHOT版本,请添加以下存储库:
Plain Text
1repositries {
2 maven { url = "https://s01.oss.sonatype.org/content/repositories/snapshots" }
3}
- Maven
Plain Text
1<dependency>
2 <groupId>com.clickhouse.spark</groupId>
3 <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
4 <version>{{ stable_version }}</version>
5</dependency>
6<dependency>
7 <groupId>com.clickhouse</groupId>
8 <artifactId>clickhouse-jdbc</artifactId>
9 <classifier>all</classifier>
10 <version>{{ clickhouse_jdbc_version }}</version>
11 <exclusions>
12 <exclusion>
13 <groupId>*</groupId>
14 <artifactId>*</artifactId>
15 </exclusion>
16 </exclusions>
17</dependency>
如果要使用SNAPSHOT版本,请添加以下存储库。
Plain Text
1<repositories>
2 <repository>
3 <id>sonatype-oss-snapshots</id>
4 <name>Sonatype OSS Snapshots Repository</name>
5 <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
6 </repository>
7</repositories>
使用Spark SQL
注意:对于仅使用SQL的用例,建议将Apache Kyuubi用于生产环境。
启动Spark SQL命令行界面
Plain Text
1$SPARK_HOME/bin/spark-sql \
2 --conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
3 --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
4 --conf spark.sql.catalog.clickhouse.protocol=http \
5 --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
6 --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
7 --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
8 --conf spark.sql.catalog.clickhouse.database=default \
9 --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
以下论点:
Plain Text
1 --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
可以替换为以下代码,以避免将JAR复制到Spark客户端节点。
Plain Text
1 --repositories https://{maven-cental-mirror or private-nexus-repo} \
2 --packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all
操作
基本操作,例如创建数据库、创建表、写表、读表等。
Plain Text
1spark-sql> use clickhouse;
2Time taken: 0.016 seconds
3
4spark-sql> create database if not exists test_db;
5Time taken: 0.022 seconds
6
7spark-sql> show databases;
8default
9system
10test_db
11Time taken: 0.289 seconds, Fetched 3 row(s)
12
13spark-sql> CREATE TABLE test_db.tbl_sql (
14 > create_time TIMESTAMP NOT NULL,
15 > m INT NOT NULL COMMENT 'part key',
16 > id BIGINT NOT NULL COMMENT 'sort key',
17 > value STRING
18 > ) USING ClickHouse
19 > PARTITIONED BY (m)
20 > TBLPROPERTIES (
21 > engine = 'MergeTree()',
22 > order_by = 'id',
23 > settings.index_granularity = 8192
24 > );
25Time taken: 0.242 seconds
26
27spark-sql> insert into test_db.tbl_sql values
28 > (timestamp'2021-01-01 10:10:10', 1, 1L, '1'),
29 > (timestamp'2022-02-02 10:10:10', 2, 2L, '2')
30 > as tabl(create_time, m, id, value);
31Time taken: 0.276 seconds
32
33spark-sql> select * from test_db.tbl_sql;
342021-01-01 10:10:10 1 1 1
352022-02-02 10:10:10 2 2 2
36Time taken: 0.116 seconds, Fetched 2 row(s)
37
38spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
39Time taken: 1.028 seconds
40
41spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
42Time taken: 0.462 seconds
43
44spark-sql> select count(*) from test_db.tbl_sql;
456
46Time taken: 1.421 seconds, Fetched 1 row(s)
47
48spark-sql> select * from test_db.tbl_sql;
492021-01-01 10:10:10 1 1 1
502021-01-01 10:10:10 1 1 1
512021-01-01 10:10:10 1 1 1
522022-02-02 10:10:10 2 2 2
532022-02-02 10:10:10 2 2 2
542022-02-02 10:10:10 2 2 2
55Time taken: 0.123 seconds, Fetched 6 row(s)
56
57spark-sql> delete from test_db.tbl_sql where id = 1;
58Time taken: 0.129 seconds
59
60spark-sql> select * from test_db.tbl_sql;
612022-02-02 10:10:10 2 2 2
622022-02-02 10:10:10 2 2 2
632022-02-02 10:10:10 2 2 2
64Time taken: 0.101 seconds, Fetched 3 row(s)
使用 Spark Shell
启动 Spark Shell
Plain Text
1$SPARK_HOME/bin/spark-shell \
2 --conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
3 --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
4 --conf spark.sql.catalog.clickhouse.protocol=http \
5 --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
6 --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
7 --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
8 --conf spark.sql.catalog.clickhouse.database=default \
9 --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
以下论点:
Plain Text
1 --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
可以替换为以下代码,以避免将JAR复制到Spark客户端节点。
Plain Text
1 --repositories https://{maven-cental-mirror or private-nexus-repo} \
2 --packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all
Shell基本操作
基本操作,例如创建数据库、创建表、写表、读表等。
Plain Text
1scala> spark.sql("use clickhouse")
2res0: org.apache.spark.sql.DataFrame = []
3
4scala> spark.sql("create database test_db")
5res1: org.apache.spark.sql.DataFrame = []
6
7scala> spark.sql("show databases").show
8+---------+
9|namespace|
10+---------+
11| default|
12| system|
13| test_db|
14+---------+
15
16scala> spark.sql("""
17 | CREATE TABLE test_db.tbl (
18 | create_time TIMESTAMP NOT NULL,
19 | m INT NOT NULL COMMENT 'part key',
20 | id BIGINT NOT NULL COMMENT 'sort key',
21 | value STRING
22 | ) USING ClickHouse
23 | PARTITIONED BY (m)
24 | TBLPROPERTIES (
25 | engine = 'MergeTree()',
26 | order_by = 'id',
27 | settings.index_granularity = 8192
28 | )
29 | """)
30res2: org.apache.spark.sql.DataFrame = []
31
32scala> :paste
33// Entering paste mode (ctrl-D to finish)
34
35spark.createDataFrame(Seq(
36 ("2021-01-01 10:10:10", 1L, "1"),
37 ("2022-02-02 10:10:10", 2L, "2")
38)).toDF("create_time", "id", "value")
39 .withColumn("create_time", to_timestamp($"create_time"))
40 .withColumn("m", month($"create_time"))
41 .select($"create_time", $"m", $"id", $"value")
42 .writeTo("test_db.tbl")
43 .append
44
45// Exiting paste mode, now interpreting.
46
47scala> spark.table("test_db.tbl").show
48+-------------------+---+---+-----+
49| create_time| m| id|value|
50+-------------------+---+---+-----+
51|2021-01-01 10:10:10| 1| 1| 1|
52|2022-02-02 10:10:10| 2| 2| 2|
53+-------------------+---+---+-----+
54
55scala> spark.sql("DELETE FROM test_db.tbl WHERE id=1")
56res3: org.apache.spark.sql.DataFrame = []
57
58scala> spark.table("test_db.tbl").show
59+-------------------+---+---+-----+
60| create_time| m| id|value|
61+-------------------+---+---+-----+
62|2022-02-02 10:10:10| 2| 2| 2|
63+-------------------+---+---+-----+
执行ClickHouse原生SQL。
Plain Text
1scala> val options = Map(
2 | "host" -> "clickhouse",
3 | "protocol" -> "http",
4 | "http_port" -> "8123",
5 | "user" -> "default",
6 | "password" -> ""
7 | )
8
9scala> val sql = """
10 | |CREATE TABLE test_db.person (
11 | | id Int64,
12 | | name String,
13 | | age Nullable(Int32)
14 | |)
15 | |ENGINE = MergeTree()
16 | |ORDER BY id
17 | """.stripMargin
18
19scala> spark.executeCommand("com.clickhouse.spark.ClickHouseCommandRunner", sql, options)
20
21scala> spark.sql("show tables in clickhouse_s1r1.test_db").show
22+---------+---------+-----------+
23|namespace|tableName|isTemporary|
24+---------+---------+-----------+
25| test_db| person| false|
26+---------+---------+-----------+
27
28scala> spark.table("clickhouse_s1r1.test_db.person").printSchema
29root
30 |-- id: long (nullable = false)
31 |-- name: string (nullable = false)
32 |-- age: integer (nullable = true)
支持的数据类型
本节概述了Spark和ClickHouse之间的数据类型映射。下表提供了从ClickHouse读取数据到Spark以及将Spark数据插入ClickHouse时转换数据类型的快速参考。
将数据从ClickHouse读取到Spark
ClickHouse 数据类型 | Spark 数据类型 | 是否支持 | 是原始的 | 备注 |
---|---|---|---|---|
Nothing | NullType | ✅ | 是 | |
Bool | BooleanType | ✅ | 是 | |
UInt8, Int16 | ShortType | ✅ | 是 | |
Int8 | ByteType | ✅ | 是 | |
UInt16,Int32 | IntegerType | ✅ | 是 | |
UInt32,Int64, UInt64 | LongType | ✅ | 是 | |
Int128,UInt128, Int256, UInt256 | DecimalType(38, 0) | ✅ | 是 | |
Float32 | FloatType | ✅ | 是 | |
Float64 | DoubleType | ✅ | 是 | |
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6 | StringType | ✅ | 是 | |
FixedString | BinaryType, StringType | ✅ | 是 | 由配置控制READ_FIXED_STRING_AS |
Decimal | DecimalType | ✅ | 是 | 精度和规模高达Decimal128 |
Decimal32 | DecimalType(9, scale) | ✅ | 是 | |
Decimal64 | DecimalType(18, scale) | ✅ | 是 | |
Decimal128 | DecimalType(38, scale) | ✅ | 是 | |
Date, Date32 | DateType | ✅ | 是 | |
DateTime, DateTime32, DateTime64 | TimestampType | ✅ | 是 | |
Array | ArrayType | ✅ | 是 | 数组元素类型也会被转换 |
Map | MapType | ✅ | 是 | 钥匙仅限于StringType |
IntervalYear | YearMonthIntervalType(Year) | ✅ | 是 | |
IntervalMonth | YearMonthIntervalType(Month) | ✅ | 是 | |
IntervalDay, IntervalHour, IntervalMinute, IntervalSecond | DayTimeIntervalType | ✅ | 否 | 使用特定间隔类型 |
Object | ❌ | |||
Nested | ❌ | |||
Tuple | ❌ | |||
Point | ❌ | |||
Polygon | ❌ | |||
MultiPolygon | ❌ | |||
Ring | ❌ | |||
IntervalQuarter | ❌ | |||
IntervalWeek | ❌ | |||
Decimal256 | ❌ | |||
AggregateFunction | ❌ | |||
SimpleAggregateFunction | ❌ |
将Spark中的数据插入ClickHouse
Spark 数据类型 | ClickHouse 数据类型 | 是否支持 | Is Primitive | 备注 |
---|---|---|---|---|
BooleanType | UInt8 | ✅ | Yes | |
ByteType | Int8 | ✅ | Yes | |
ShortType | Int16 | ✅ | Yes | |
IntegerType | Int32 | ✅ | Yes | |
LongType | Int64 | ✅ | Yes | |
FloatType | Float32 | ✅ | Yes | |
DoubleType | Float64 | ✅ | Yes | |
StringType | String | ✅ | Yes | |
VarcharType | String | ✅ | Yes | |
CharType | String | ✅ | Yes | |
DecimalType | Decimal(p, s) | ✅ | Yes | Precision and scale up to Decimal128 |
DateType | Date | ✅ | Yes | |
TimestampTypeArrayType (list, tuple, or array) | DateTime | ✅ | Yes | |
ArrayType (list, tuple, or array) | Array | ✅ | No | Array element type is also converted |
MapType | Map | ✅ | No | Keys are limited toStringType |
Object | ❌ | |||
Nested | ❌ |
Spark JDBC
读取数据
Plain Text
1public static void main(String[] args) {
2 // Initialize Spark session
3 SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
4
5 // JDBC connection details
6 String jdbcUrl = "jdbc:ch://localhost:8123/default";
7 Properties jdbcProperties = new Properties();
8 jdbcProperties.put("user", "default");
9 jdbcProperties.put("password", "123456");
10
11 // Load the table from ClickHouse
12 Dataset<Row> df = spark.read().jdbc(jdbcUrl, "example_table", jdbcProperties);
13
14 // Show the DataFrame
15 df.show();
16
17 // Stop the Spark session
18 spark.stop();
19 }
写入数据
Plain Text
1 public static void main(String[] args) {
2 // Initialize Spark session
3 SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
4
5 // JDBC connection details
6 String jdbcUrl = "jdbc:ch://localhost:8123/default";
7 Properties jdbcProperties = new Properties();
8 jdbcProperties.put("user", "default");
9 jdbcProperties.put("password", "******");
10 // Create a sample DataFrame
11 StructType schema = new StructType(new StructField[]{
12 DataTypes.createStructField("id", DataTypes.IntegerType, false),
13 DataTypes.createStructField("name", DataTypes.StringType, false)
14 });
15
16 List<Row> rows = new ArrayList<Row>();
17 rows.add(RowFactory.create(1, "John"));
18 rows.add(RowFactory.create(2, "Doe"));
19
20 Dataset<Row> df = spark.createDataFrame(rows, schema);
21
22 df.write()
23 .mode(SaveMode.Append)
24 .jdbc(jdbcUrl, "my_table", jdbcProperties);
25 // Show the DataFrame
26 df.show();
27
28 // Stop the Spark session
29 spark.stop();
30 }