物联网设备实时报警统计(流表Join)
更新时间:2022-12-01
概览
统计每个设备每分钟报警次数。
需求场景
用户拥有1千多台设备,分布在不同城市的多个厂区,每个设备上的传感器大概每5秒采集并上传数据到 物联网核心套件(IoT Core)或 物接入(IoT Hub) 的 MQTT 当中作为第一个 source,一些维度信息存放在 云数据库(RDS)作为第二个 source,在我们 BSC 中创建 SPARK_STREAM/SQL 类型的作业用于每分钟报警次数的统计,并实时将处理结果推送到 云数据库(RDS),最终在 数据可视化(Sugar BI)的可视化报表中展示。
方案概述
用户设备 → IoT Hub → |
|→ BSC → RDS → Sugar BIRDS → |
配置步骤
一个完整的 Spark SQL 作业由 source 表、sink 表和 DML 语句构成。
定义 MQTT Source表
SPARK
1CREATE TABLE source_mqtt_table(
2 sensorId STRING,
3 time STRING,
4 status INTEGER
5) WITH(
6 type = 'MQTT',
7 brokerUrl = 'tcp://duig1nr.mqtt.iot.bj.baidubce.com:1883', --必填
8 topic = 'sensor', --必填
9 username = 'iotdemo', --必填
10 password = 'iotdemo', --必填
11 encode = 'JSON',
12 connectionTimeout = '30', --非必填,访问超时设置,单位:s
13 keepAliveInterval = '60', --非必填,规定时间段内不活动时连接会被断开,单位:s
14 maxBatchMessageNum = 'Int.Max', --非必填,每个batch最大数据条数
15 maxBatchMessageSize = 'Int.Max' --非必填,每个batch最大消息字节数
16);
定义 RDS Source表
SPARK
1CREATE TABLE source_rds_table(
2sensorId STRING,
3sensorType STRING,
4deviceId STRING,
5useTime INTEGER
6) WITH(
7type = 'RDS',
8user = 'rdsdemo', --必填,数据库用户名
9password = 'rdsdemo', --必填,数据库访问密码
10url = 'jdbc:mysql://mysql55.rdsmwi1zrjn5ww8.rds.bd.baidubce.com:3306/bsc_rds_test?useUnicode=true&characterEncoding=UTF8', --必填,jdbc访问RDS的url
11dbTable = 'test' --必填,数据表名称
12);
定义RDS sink表
SPARK
1CREATE TABLE sink_rds_table(
2deviceId STRING,
3time TIMESTAMP,
4nums INTEGER
5) WITH(
6 type = 'RDS',
7 user = 'iotdemo', --必填,数据库用户名
8 password = 'iotdemo11', --必填,数据库访问密码
9 url = 'jdbc:mysql://mysql55.rdsmwi1zrjn5ww8.rds.bd.baidubce.com:3306/bsc_rds? useUnicode=true&characterEncoding=UTF8', --必填,jdbc访问RDS的url
10 dbTable = 'iotdemo' --必填,数据表名称
11);
编写数据统计DML语句
统计这一分钟内每个设备的报警次数。由于使用的是滚动窗口,也就意味着数据将在每分钟结束时候产出一份并写入到RDS。
SPARK
1INSERT INTO
2sink_rds_table outputmode append
3SELECT
4source_rds_table.deviceId,
5CAST(FROM_UNIXTIME(CAST(source_mqtt_table.time AS LONG)) AS TIMESTAMP) AS time,
6count(*) AS nums
7FROM
8source_mqtt_table INNER JOIN source_rds_table ON source_mqtt_table.sensorId = source_rds_table.sensorId
9WHERE
10source_mqtt_table.status = 1
11GROUP BY
12window(time, "1 minute"),
13deviceId