CDN 日志提取中转(ETL)
更新时间:2023-12-07
概览
用户对 CDN 日志进行提取中转,属于 ETL 场景, 用于数据的实时清洗、归并和结构化。
需求场景
所有的 CDN 日志通过 flume 直接推送到 百度消息服务(KAFKA)中作为流式计算 source , 在我们 BSC 中创建 SPARK_STREAM/SQL 类型的作业用于 CDN 日志的提取中转,并实时将结果写到 百度消息服务(KAFKA)或 对象存储(BOS)当中,用户可以对 sink 端的 KAFKA / BOS 进行进一步的处理。
方案概述
服务器 → KAFKA → BSC → KAFKA / BOS → 其他
配置步骤
一个完整的 Spark SQL 作业由 source 表、sink 表和 DML 语句构成。
定义 KAFKA Source 表
SPARK
1CREATE TABLE source_kafka_table (
2 `prefix` STRING,
3 `region` STRING,
4 `userIdSrc` STRING,
5 `clusterNameSrc` STRING,
6 `transDurationSrc` DOUBLE,
7 `srcDurationSrc` STRING,
8 `ts` BIGINT
9) WITH (
10 'connector.type' = 'KAFKA',
11 'format.encode' = 'CSV',
12 'format.attributes.field-delimiter' = ' ',
13 'connector.topic' = 'xxxxxxxxx__bsc-source',
14 'connector.properties.bootstrap.servers' = 'kafka.bd.baidubce.com:9071',
15 'connector.properties.ssl.filename' = 'kafka-key_bd.zip'
16);
定义 KAFKA / BOS Sink 表
编写数据提取DML语句
根据 prefix 对日志内容进行提取,并存放到下游的云服务中,为之后的其他处理做数据清洗。
SPARK
1INSERT INTO
2 sink_table outputmode append
3SELECT
4 from_unixtime(`ts`/1000-(`ts`/1000)%60,'yyyy-MM-dd HH') AS `timestamp`,
5 `region`,
6 `userIdSrc`,
7 `clusterNameSrc`
8FROM
9 source_kafka_table
10WHERE
11 prefix = 'xxxxxxxx';