使用 PALO 和 Iceberg
作为一种全新的开放式的数据管理架构,湖仓一体(Data Lakehouse)融合了数据仓库的高性能、实时性以及数据湖的低成本、灵活性等优势,帮助用户更加便捷地满足各种数据处理分析的需求,在企业的大数据体系中已经得到越来越多的应用。
在过去多个版本中,PALO 持续加深与数据湖的融合,当前已演进出一套成熟的湖仓一体解决方案。
- 自 0.15 版本起,PALO 引入 Hive 和 Iceberg 外部表,尝试在 Apache Iceberg 之上探索与数据湖的能力结合。
- 自 1.2 版本起,PALO 正式引入 Multi-Catalog 功能,实现了多种数据源的自动元数据映射和数据访问、并对外部数据读取和查询执行等方面做了诸多性能优化,完全具备了构建极速易用 Lakehouse 架构的能力。
- 在 2.1 版本中,PALO 湖仓一体架构得到全面加强,不仅增强了主流数据湖格式(Hudi、Iceberg、Paimon 等)的读取和写入能力,还引入了多 SQL 方言兼容、可从原有系统无缝切换至 PALO。在数据科学及大规模数据读取场景上,PALO 集成了 Arrow Flight 高速读取接口,使得数据传输效率实现 100 倍的提升。
PALO & Iceberg
Apache Iceberg 是一种开源、高性能、高可靠的数据湖表格式,可实现超大规模数据的分析与管理。它支持 PALO 在内的多种主流查询引擎,兼容 HDFS 以及各种对象云存储,具备 ACID、Schema 演进、高级过滤、隐藏分区和分区布局演进等特性,可确保高性能查询以及数据的可靠性及一致性,其时间旅行和版本回滚功能也为数据管理带来较高的灵活性。
PALO 对 Iceberg 多项核心特性提供了原生支持:
- 支持 Hive Metastore、Hadoop、REST、Glue、Google Dataproc Metastore、DLF 等多种 Iceberg Catalog 类型。
- 原生支持 Iceberg V1/V2 表格式,以及 Position Delete、Equality Delete 文件的读取。
- 支持通过表函数查询 Iceberg 表快照历史。
- 支持时间旅行(Time Travel)功能。
- 原生支持 Iceberg 表引擎。可以通过 PALO 直接创建、管理以及将数据写入到 Iceberg 表。支持完善的分区 Transform 函数,从而提供隐藏分区和分区布局演进等能力。
用户可以基于 PALO + Apache Iceberg 快速构建高效的湖仓一体解决方案,以灵活应对实时数据分析与处理的各种需求:
- 通过 PALO 高性能查询引擎对 Iceberg 表数据和其他数据源进行关联数据分析,构建统一的联邦数据分析平台。
- 通过 PALO 直接管理和构建 Iceberg 表,在 PALO 中完成对数据的清洗、加工并写入到 Iceberg 表,构建统一的湖仓数据处理平台。
- 通过 Iceberg 表引擎,将 PALO 数据共享给其他上下游系统做进一步处理,构建统一的开放数据存储平台。
未来,Apache Iceberg 将作为 PALO 的原生表引擎之一,提供更加完善的湖格式数据的分析、管理功能。PALO 也将逐步支持包括 Update/Delete/Merge、写回时排序、增量数据读取、元数据管理等 Apache Iceberg 更多高级特性,共同构建统一、高性能、实时的湖仓平台。
使用指南
本文档主要讲解如何在 Docker 环境下快速搭建 PALO + Apache Iceberg 测试 & 演示环境,并展示各功能的使用操作。
01 环境准备
本文示例采用 Docker Compose 部署,组件及版本号如下:
组件名称 | 版本 |
---|---|
PALO | 默认 2.1.5,可修改 |
Apache Iceberg | 1.4.3 |
MinIO | RELEASE.2024-04-29T09-56-05Z |
02 环境部署
-
启动所有组件
bash ./start_all.sh
-
启动后,可以使用如下脚本,登陆 PALO 命令行:
SQL1-- login doris 2bash ./start_doris_client.sh
03 创建 Iceberg 表
首先登陆 PALO 命令行后,PALO 集群中已经创建了名为 Iceberg 的 Catalog(可通过 SHOW CATALOGS
/SHOW CREATE CATALOG iceberg
查看)。以下为该 Catalog 的创建语句:
1-- 已创建,无需执行
2CREATE CATALOG `iceberg` PROPERTIES (
3 "type" = "iceberg",
4 "iceberg.catalog.type" = "rest",
5 "warehouse" = "s3://warehouse/",
6 "uri" = "http://rest:8181",
7 "s3.access_key" = "admin",
8 "s3.secret_key" = "password",
9 "s3.endpoint" = "http://minio:9000"
10);
在 Iceberg Catalog 创建数据库和 Iceberg 表:
1mysql> SWITCH iceberg;
2Query OK, 0 rows affected (0.00 sec)
3
4mysql> CREATE DATABASE nyc;
5Query OK, 0 rows affected (0.12 sec)
6
7mysql> CREATE TABLE iceberg.nyc.taxis
8 (
9 vendor_id BIGINT,
10 trip_id BIGINT,
11 trip_distance FLOAT,
12 fare_amount DOUBLE,
13 store_and_fwd_flag STRING,
14 ts DATETIME
15 )
16 PARTITION BY LIST (vendor_id, DAY(ts)) ()
17 PROPERTIES (
18 "compression-codec" = "zstd",
19 "write-format" = "parquet"
20 );
21Query OK, 0 rows affected (0.15 sec)
04 数据写入
向 Iceberg 表中插入数据:
1mysql> INSERT INTO iceberg.nyc.taxis
2 VALUES
3 (1, 1000371, 1.8, 15.32, 'N', '2024-01-01 9:15:23'),
4 (2, 1000372, 2.5, 22.15, 'N', '2024-01-02 12:10:11'),
5 (2, 1000373, 0.9, 9.01, 'N', '2024-01-01 3:25:15'),
6 (1, 1000374, 8.4, 42.13, 'Y', '2024-01-03 7:12:33');
7Query OK, 4 rows affected (1.61 sec)
8{'status':'COMMITTED', 'txnId':'10085'}
通过 CREATE TABLE AS SELECT
来创建一张 Iceberg 表:
1mysql> CREATE TABLE iceberg.nyc.taxis2 AS SELECT * FROM iceberg.nyc.taxis;
2Query OK, 6 rows affected (0.25 sec)
3{'status':'COMMITTED', 'txnId':'10088'}
05 数据查询
-
简单查询
SQL1mysql> SELECT * FROM iceberg.nyc.taxis; 2+-----------+---------+---------------+-------------+--------------------+----------------------------+ 3| vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | ts | 4+-----------+---------+---------------+-------------+--------------------+----------------------------+ 5| 1 | 1000374 | 8.4 | 42.13 | Y | 2024-01-03 07:12:33.000000 | 6| 1 | 1000371 | 1.8 | 15.32 | N | 2024-01-01 09:15:23.000000 | 7| 2 | 1000373 | 0.9 | 9.01 | N | 2024-01-01 03:25:15.000000 | 8| 2 | 1000372 | 2.5 | 22.15 | N | 2024-01-02 12:10:11.000000 | 9+-----------+---------+---------------+-------------+--------------------+----------------------------+ 104 rows in set (0.37 sec) 11 12mysql> SELECT * FROM iceberg.nyc.taxis2; 13+-----------+---------+---------------+-------------+--------------------+----------------------------+ 14| vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | ts | 15+-----------+---------+---------------+-------------+--------------------+----------------------------+ 16| 1 | 1000374 | 8.4 | 42.13 | Y | 2024-01-03 07:12:33.000000 | 17| 1 | 1000371 | 1.8 | 15.32 | N | 2024-01-01 09:15:23.000000 | 18| 2 | 1000373 | 0.9 | 9.01 | N | 2024-01-01 03:25:15.000000 | 19| 2 | 1000372 | 2.5 | 22.15 | N | 2024-01-02 12:10:11.000000 | 20+-----------+---------+---------------+-------------+--------------------+----------------------------+ 214 rows in set (0.35 sec)
-
分区剪裁
SQL1mysql> SELECT * FROM iceberg.nyc.taxis where vendor_id = 2 and ts >= '2024-01-01' and ts < '2024-01-02'; 2+-----------+---------+---------------+-------------+--------------------+----------------------------+ 3| vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | ts | 4+-----------+---------+---------------+-------------+--------------------+----------------------------+ 5| 2 | 1000373 | 0.9 | 9.01 | N | 2024-01-01 03:25:15.000000 | 6+-----------+---------+---------------+-------------+--------------------+----------------------------+ 71 row in set (0.06 sec) 8 9mysql> EXPLAIN VERBOSE SELECT * FROM iceberg.nyc.taxis where vendor_id = 2 and ts >= '2024-01-01' and ts < '2024-01-02'; 10 11.... 12| 0:VICEBERG_SCAN_NODE(71) 13| table: taxis 14| predicates: (ts[#5] < '2024-01-02 00:00:00'), (vendor_id[#0] = 2), (ts[#5] >= '2024-01-01 00:00:00') 15| inputSplitNum=1, totalFileSize=3539, scanRanges=1 16| partition=1/0 17| backends: 18| 10002 19| s3://warehouse/wh/nyc/taxis/data/vendor_id=2/ts_day=2024-01-01/40e6ca404efa4a44-b888f23546d3a69c_5708e229-2f3d-4b68-a66b-44298a9d9815-0.zstd.parquet start: 0 length: 3539 20| cardinality=6, numNodes=1 21| pushdown agg=NONE 22| icebergPredicatePushdown= 23| ref(name="ts") < 1704153600000000 24| ref(name="vendor_id") == 2 25| ref(name="ts") >= 1704067200000000 26....
通过
EXPLAIN VERBOSE
语句的结果可知,vendor_id = 2 and ts >= '2024-01-01' and ts < '2024-01-02'
谓词条件,最终只命中一个分区(partition=1/0
)。同时也可知,因为在建表时指定了分区 Transform 函数
DAY(ts)
,原始数据中的的值2024-01-01 03:25:15.000000
会被转换成文件目录中的分区信息ts_day=2024-01-01
。
06 Time Travel
我们先再次插入几行数据:
1INSERT INTO iceberg.nyc.taxis VALUES (1, 1000375, 8.8, 55.55, 'Y', '2024-01-01 8:10:22'), (3, 1000376, 7.4, 32.35, 'N', '2024-01-02 1:14:45');
2Query OK, 2 rows affected (0.17 sec)
3{'status':'COMMITTED', 'txnId':'10086'}
4
5mysql> SELECT * FROM iceberg.nyc.taxis;
6+-----------+---------+---------------+-------------+--------------------+----------------------------+
7| vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | ts |
8+-----------+---------+---------------+-------------+--------------------+----------------------------+
9| 3 | 1000376 | 7.4 | 32.35 | N | 2024-01-02 01:14:45.000000 |
10| 2 | 1000372 | 2.5 | 22.15 | N | 2024-01-02 12:10:11.000000 |
11| 1 | 1000374 | 8.4 | 42.13 | Y | 2024-01-03 07:12:33.000000 |
12| 1 | 1000371 | 1.8 | 15.32 | N | 2024-01-01 09:15:23.000000 |
13| 1 | 1000375 | 8.8 | 55.55 | Y | 2024-01-01 08:10:22.000000 |
14| 2 | 1000373 | 0.9 | 9.01 | N | 2024-01-01 03:25:15.000000 |
15+-----------+---------+---------------+-------------+--------------------+----------------------------+
166 rows in set (0.11 sec)
使用 iceberg_meta
表函数查询表的快照信息:
1mysql> select * from iceberg_meta("table" = "iceberg.nyc.taxis", "query_type" = "snapshots");
2+---------------------+---------------------+---------------------+-----------+-----------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3| committed_at | snapshot_id | parent_id | operation | manifest_list | summary |
4+---------------------+---------------------+---------------------+-----------+-----------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
5| 2024-07-29 03:38:22 | 8483933166442433486 | -1 | append | s3://warehouse/wh/nyc/taxis/metadata/snap-8483933166442433486-1-5f7b7736-8022-4ba1-9db2-51ae7553be4d.avro | {"added-data-files":"4","added-records":"4","added-files-size":"14156","changed-partition-count":"4","total-records":"4","total-files-size":"14156","total-data-files":"4","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0"} |
6| 2024-07-29 03:40:23 | 4726331391239920914 | 8483933166442433486 | append | s3://warehouse/wh/nyc/taxis/metadata/snap-4726331391239920914-1-6aa3d142-6c9c-4553-9c04-08ad4d49a4ea.avro | {"added-data-files":"2","added-records":"2","added-files-size":"7078","changed-partition-count":"2","total-records":"6","total-files-size":"21234","total-data-files":"6","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0"} |
7+---------------------+---------------------+---------------------+-----------+-----------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
82 rows in set (0.07 sec)
使用 FOR VERSION AS OF
语句查询指定快照:
1mysql> SELECT * FROM iceberg.nyc.taxis FOR VERSION AS OF 8483933166442433486;
2+-----------+---------+---------------+-------------+--------------------+----------------------------+
3| vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | ts |
4+-----------+---------+---------------+-------------+--------------------+----------------------------+
5| 1 | 1000371 | 1.8 | 15.32 | N | 2024-01-01 09:15:23.000000 |
6| 1 | 1000374 | 8.4 | 42.13 | Y | 2024-01-03 07:12:33.000000 |
7| 2 | 1000372 | 2.5 | 22.15 | N | 2024-01-02 12:10:11.000000 |
8| 2 | 1000373 | 0.9 | 9.01 | N | 2024-01-01 03:25:15.000000 |
9+-----------+---------+---------------+-------------+--------------------+----------------------------+
104 rows in set (0.05 sec)
11
12mysql> SELECT * FROM iceberg.nyc.taxis FOR VERSION AS OF 4726331391239920914;
13+-----------+---------+---------------+-------------+--------------------+----------------------------+
14| vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | ts |
15+-----------+---------+---------------+-------------+--------------------+----------------------------+
16| 1 | 1000374 | 8.4 | 42.13 | Y | 2024-01-03 07:12:33.000000 |
17| 1 | 1000375 | 8.8 | 55.55 | Y | 2024-01-01 08:10:22.000000 |
18| 3 | 1000376 | 7.4 | 32.35 | N | 2024-01-02 01:14:45.000000 |
19| 2 | 1000372 | 2.5 | 22.15 | N | 2024-01-02 12:10:11.000000 |
20| 2 | 1000373 | 0.9 | 9.01 | N | 2024-01-01 03:25:15.000000 |
21| 1 | 1000371 | 1.8 | 15.32 | N | 2024-01-01 09:15:23.000000 |
22+-----------+---------+---------------+-------------+--------------------+----------------------------+
236 rows in set (0.04 sec)
使用 FOR TIME AS OF
语句查询指定快照:
1mysql> SELECT * FROM iceberg.nyc.taxis FOR TIME AS OF "2024-07-29 03:38:23";
2+-----------+---------+---------------+-------------+--------------------+----------------------------+
3| vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | ts |
4+-----------+---------+---------------+-------------+--------------------+----------------------------+
5| 1 | 1000374 | 8.4 | 42.13 | Y | 2024-01-03 07:12:33.000000 |
6| 1 | 1000371 | 1.8 | 15.32 | N | 2024-01-01 09:15:23.000000 |
7| 2 | 1000372 | 2.5 | 22.15 | N | 2024-01-02 12:10:11.000000 |
8| 2 | 1000373 | 0.9 | 9.01 | N | 2024-01-01 03:25:15.000000 |
9+-----------+---------+---------------+-------------+--------------------+----------------------------+
104 rows in set (0.04 sec)
11
12mysql> SELECT * FROM iceberg.nyc.taxis FOR TIME AS OF "2024-07-29 03:40:22";
13+-----------+---------+---------------+-------------+--------------------+----------------------------+
14| vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | ts |
15+-----------+---------+---------------+-------------+--------------------+----------------------------+
16| 2 | 1000373 | 0.9 | 9.01 | N | 2024-01-01 03:25:15.000000 |
17| 1 | 1000374 | 8.4 | 42.13 | Y | 2024-01-03 07:12:33.000000 |
18| 2 | 1000372 | 2.5 | 22.15 | N | 2024-01-02 12:10:11.000000 |
19| 1 | 1000371 | 1.8 | 15.32 | N | 2024-01-01 09:15:23.000000 |
20+-----------+---------+---------------+-------------+--------------------+----------------------------+
214 rows in set (0.05 sec)
07 与 PyIceberg 交互
请使用 PALO 2.1.8/3.0.4 以上版本。
加载 Iceberg 表:
1from pyiceberg.catalog import load_catalog
2
3catalog = load_catalog(
4 "iceberg",
5 **{
6 "warehouse" = "warehouse",
7 "uri" = "http://rest:8181",
8 "s3.access-key-id" = "admin",
9 "s3.secret-access-key" = "password",
10 "s3.endpoint" = "http://minio:9000"
11 },
12)
13table = catalog.load_table("nyc.taxis")
读取为 Arrow Table:
1print(table.scan().to_arrow())
2
3pyarrow.Table
4vendor_id: int64
5trip_id: int64
6trip_distance: float
7fare_amount: double
8store_and_fwd_flag: large_string
9ts: timestamp[us]
10----
11vendor_id: [[1],[1],[2],[2]]
12trip_id: [[1000371],[1000374],[1000373],[1000372]]
13trip_distance: [[1.8],[8.4],[0.9],[2.5]]
14fare_amount: [[15.32],[42.13],[9.01],[22.15]]
15store_and_fwd_flag: [["N"],["Y"],["N"],["N"]]
16ts: [[2024-01-01 09:15:23.000000],[2024-01-03 07:12:33.000000],[2024-01-01 03:25:15.000000],[2024-01-02 12:10:11.000000]]
读取为 Pandas DataFrame:
1print(table.scan().to_pandas())
2
3vendor_id trip_id trip_distance fare_amount store_and_fwd_flag ts
40 1 1000371 1.8 15.32 N 2024-01-01 09:15:23
51 1 1000374 8.4 42.13 Y 2024-01-03 07:12:33
62 2 1000373 0.9 9.01 N 2024-01-01 03:25:15
73 2 1000372 2.5 22.15 N 2024-01-02 12:10:11
读取为 Polars DataFrame:
1import polars as pl
2
3print(pl.scan_iceberg(table).collect())
4
5shape: (4, 6)
6┌───────────┬─────────┬───────────────┬─────────────┬────────────────────┬─────────────────────┐
7│ vendor_id ┆ trip_id ┆ trip_distance ┆ fare_amount ┆ store_and_fwd_flag ┆ ts │
8│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
9│ i64 ┆ i64 ┆ f32 ┆ f64 ┆ str ┆ datetime[μs] │
10╞═══════════╪═════════╪═══════════════╪═════════════╪════════════════════╪═════════════════════╡
11│ 1 ┆ 1000371 ┆ 1.8 ┆ 15.32 ┆ N ┆ 2024-01-01 09:15:23 │
12│ 1 ┆ 1000374 ┆ 8.4 ┆ 42.13 ┆ Y ┆ 2024-01-03 07:12:33 │
13│ 2 ┆ 1000373 ┆ 0.9 ┆ 9.01 ┆ N ┆ 2024-01-01 03:25:15 │
14│ 2 ┆ 1000372 ┆ 2.5 ┆ 22.15 ┆ N ┆ 2024-01-02 12:10:11 │
15└───────────┴─────────┴───────────────┴─────────────┴────────────────────┴─────────────────────┘
08 附录
通过 PyIceberg 写入数据
加载 Iceberg 表:
1from pyiceberg.catalog import load_catalog
2
3catalog = load_catalog(
4 "iceberg",
5 **{
6 "warehouse" = "warehouse",
7 "uri" = "http://rest:8181",
8 "s3.access-key-id" = "admin",
9 "s3.secret-access-key" = "password",
10 "s3.endpoint" = "http://minio:9000"
11 },
12)
13table = catalog.load_table("nyc.taxis")
Arrow Table 写入 Iceberg:
1import pyarrow as pa
2
3df = pa.Table.from_pydict(
4 {
5 "vendor_id": pa.array([1, 2, 2, 1], pa.int64()),
6 "trip_id": pa.array([1000371, 1000372, 1000373, 1000374], pa.int64()),
7 "trip_distance": pa.array([1.8, 2.5, 0.9, 8.4], pa.float32()),
8 "fare_amount": pa.array([15.32, 22.15, 9.01, 42.13], pa.float64()),
9 "store_and_fwd_flag": pa.array(["N", "N", "N", "Y"], pa.string()),
10 "ts": pa.compute.strptime(
11 ["2024-01-01 9:15:23", "2024-01-02 12:10:11", "2024-01-01 3:25:15", "2024-01-03 7:12:33"],
12 "%Y-%m-%d %H:%M:%S",
13 "us",
14 ),
15 }
16)
17table.append(df)
Pandas DataFrame 写入 Iceberg:
1import pyarrow as pa
2import pandas as pd
3
4df = pd.DataFrame(
5 {
6 "vendor_id": pd.Series([1, 2, 2, 1]).astype("int64[pyarrow]"),
7 "trip_id": pd.Series([1000371, 1000372, 1000373, 1000374]).astype("int64[pyarrow]"),
8 "trip_distance": pd.Series([1.8, 2.5, 0.9, 8.4]).astype("float32[pyarrow]"),
9 "fare_amount": pd.Series([15.32, 22.15, 9.01, 42.13]).astype("float64[pyarrow]"),
10 "store_and_fwd_flag": pd.Series(["N", "N", "N", "Y"]).astype("string[pyarrow]"),
11 "ts": pd.Series(["2024-01-01 9:15:23", "2024-01-02 12:10:11", "2024-01-01 3:25:15", "2024-01-03 7:12:33"]).astype("timestamp[us][pyarrow]"),
12 }
13)
14table.append(pa.Table.from_pandas(df))
Polars DataFrame 写入 Iceberg:
1import polars as pl
2
3df = pl.DataFrame(
4 {
5 "vendor_id": [1, 2, 2, 1],
6 "trip_id": [1000371, 1000372, 1000373, 1000374],
7 "trip_distance": [1.8, 2.5, 0.9, 8.4],
8 "fare_amount": [15.32, 22.15, 9.01, 42.13],
9 "store_and_fwd_flag": ["N", "N", "N", "Y"],
10 "ts": ["2024-01-01 9:15:23", "2024-01-02 12:10:11", "2024-01-01 3:25:15", "2024-01-03 7:12:33"],
11 },
12 {
13 "vendor_id": pl.Int64,
14 "trip_id": pl.Int64,
15 "trip_distance": pl.Float32,
16 "fare_amount": pl.Float64,
17 "store_and_fwd_flag": pl.String,
18 "ts": pl.String,
19 },
20).with_columns(pl.col("ts").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S"))
21table.append(df.to_arrow())