CDN 接口日志聚合统计
更新时间:2023-12-07
概览
用户对 CDN 接口日志进行聚合统计。
需求场景
所有的 CDN 接口调用日志通过 flume 直接推送到 百度消息服务(KAFKA)中作为流式计算 source , 在我们 BSC 中创建 SPARK_STREAM/SQL 类型的作业用于 CDN 接口调用日志的聚合统计,并实时将聚合结果写到 百度数据仓库(Palo)当中,用户可以利用 数据可视化工具(如 Sugar BI)等调用 Palo 的 API 完成数据展示。
方案概述
服务器 → KAFKA → BSC → Palo → Sugar BI
配置步骤
一个完整的 Spark SQL 作业由 source 表、sink 表和 DML 语句构成。
定义 KAFKA Source 表
SPARK
1CREATE TABLE source_kafka_table (
2 `prefix` STRING,
3 `region` STRING,
4 `userIdSrc` STRING,
5 `clusterNameSrc` STRING,
6 `transDurationSrc` DOUBLE,
7 `srcDurationSrc` STRING,
8 `ts` BIGINT
9) WITH (
10 'connector.type' = 'KAFKA',
11 'format.encode' = 'CSV',
12 'format.attributes.field-delimiter' = ' ', -- 分隔符为空格
13 'connector.topic' = 'xxxxxxxxx__bsc-source',
14 'connector.properties.bootstrap.servers' = 'kafka.bd.baidubce.com:9071',
15 'connector.properties.ssl.filename' = 'kafka-key_bd.zip'
16);
定义 Palo Sink 表
编写数据聚合DML语句
按照某些值和指定的时间进行聚合,没有使用窗口,而是定义 5 分钟的微批触发时间来完成聚合,并且聚合状态要设置为 no state
SPARK
1INSERT INTO
2 sink_palo_table outputmode append
3SELECT
4 format_string('%s,%d,%s,%s,%s,%d,%f,%f\n',
5 from_unixtime(`ts`/1000-(`ts`/1000)%60,'yyyy-MM-dd HH'),
6 unix_timestamp(from_unixtime(`ts`/1000-(`ts`/1000)%60,'yyyy-MM-dd HH:mm'),'yyyy-MM-dd HH:mm'),
7 `region`,
8 `userIdSrc`,
9 `clusterNameSrc`,
10 count(*),
11 sum(if(`srcDurationSrc` != 'null', cast(`srcDurationSrc` as double), 0)/(if(`transDurationSrc` != 0, `transDurationSrc`, 0.01))),
12 sum(`transDurationSrc`)
13 )
14FROM
15 source_kafka_table
16WHERE
17 prefix = 'xxxxxxxx'
18GROUP BY
19 `userIdSrc`,
20 `clusterNameSrc`,
21 `region`,
22 from_unixtime(`ts`/1000-(`ts`/1000)%60,'yyyy-MM-dd HH'),
23 unix_timestamp(from_unixtime(`ts`/1000-(`ts`/1000)%60,'yyyy-MM-dd HH:mm'),'yyyy-MM-dd HH:mm') aggregate no state; -- 聚合过程设置为无状态