常用查询语句示例
此文档介绍实际业务场景中常用的一些格式转换语句用法,用户可以参考这些示例完成数据查询语句的编写。
消息示例
示例消息包含了大部分常见的设备上报消息格式,如无特殊说明将以此示例消息或此消息的一部分作为原始数据进行演示。
1 {
2 "reqId":"442c1da4-9d3a-4f9b-a6e9-bfe858e4ac43",
3 "method":"thing.event.post",
4 "timestamp":1465376157007,
5 "deviceName":"device001",
6 "productName":"product001",
7 "params":{
8 "host":"192.166.1.1",
9 "color":"red",
10 "historydDatas":[
11 {
12 "key":"temperature",
13 "value":50,
14 "time":1465376157007
15 },
16 {
17 "key":"humidity",
18 "value":80,
19 "time":1465376157337
20 },
21 {
22 "key":"pressure",
23 "value":8.5,
24 "time":1465376157337
25 },
26 {
27 "key":"speed",
28 "value":80,
29 "time":1465376157337
30 }
31 ]
32 },
33 "properties":{
34 "temperature":35,
35 "humidity":22
36 },
37 "other":{
38 "Misc":null
39 }
40 }
基础用法
固定格式A转换为固定格式B
- 适用场景
适合输入消息格式固定且元素个数固定,仅对数据进行格式转换的场景。
可以选取原始消息中任何值赋值给目标格式对应key,可以将函数返回值(如$millis()获取时间戳)、常量("msgtype":"property")等原始消息中没有的信息到目标格式中。
- 原始消息
1 {
2 "reqId":"442c1da4-9d3a-4f9b-a6e9-bfe858e4ac43",
3 "deviceName":"device001",
4 "productName":"product001",
5 "properties":{
6 "temperature":35,
7 "humidity":22
8 },
9 "other":{
10 "Misc":null
11 }
12 }
- 目标数据
1 {
2 "token": "442c1da4-9d3a-4f9b-a6e9-bfe858e4ac43",
3 "msgtype": "property",
4 "timestamp": 1596821067991,
5 "tags": {
6 "deviceName": "device001",
7 "productName": "product001"
8 },
9 "report": {
10 "temperature_new": 35,
11 "humidity_new": 22
12 }
13 }
- 查询语句
1 {
2 "token": reqId,
3 "msgtype":"property",
4 "timestamp":$millis(),
5 "tags":{
6 "deviceName":deviceName,
7 "productName":productName
8 },
9 "report":{
10 "temperature_new":properties.temperature,
11 "humidity_new":properties.humidity
12 }
13 }
- 详解
语法 | 描述 |
---|---|
"token": reqId | 将原始消息中reqId重命名为token |
"msgtype":"property" | 向目标数据中添加msgtype属性并赋给固定值"property" |
"timestamp":$millis() | 向目标数据中添加timestamp属性并赋给从平台获得的时间戳 |
"tags":{"deviceName":deviceName,"productName":productName} | 将deviceName、productName组装成一个对象赋值给属性tags |
高阶用法
向原始消息中添加属性
-
适用场景
保持原始消息中所有keys不变的情况,向原始消息中添加额外的属性
- 原始消息
1 {
2 "reqId":"442c1da4-9d3a-4f9b-a6e9-bfe858e4ac43",
3 "method":"thing.event.post",
4 "deviceName":"device001",
5 "productName":"product001",
6 "properties":{
7 "temperature":35,
8 "humidity":22
9 }
10 }
- 目标数据
添加timestamp、a两个新属性到原始消息中
1 {
2 "reqId": "442c1da4-9d3a-4f9b-a6e9-bfe858e4ac43",
3 "method": "thing.event.post",
4 "timestamp": 1596815775720,
5 "a":100,
6 "deviceName": "device001",
7 "productName": "product001",
8 "properties": {
9 "temperature": 35,
10 "humidity": 22
11 }
12 }
- 查询语句
1 $merge([$,{
2 "timestamp":$millis(),
3 "a":100
4 }])
- 详解
$merge()函数可以将对象数组中所有对象合并为一个对象并且去除掉相同的key只保留最后一次的值,我们把原始消息作为数组的第一个元素,将需要添加的keys组成一个对象作为数组的第二个元素
语法 | 描述 |
---|---|
{"timestamp":$millis(),"a":100} | 将需要添加的所有元素放入一个对象,此例中添加属性timestamp和states |
[$,{"timestamp":$millis(),"a":100}] | 将原始消息 $ 与待添加元素对象放入同一个数组 |
$merge([$,{"timestamp":$millis(),"a":100}]) | 使用$merge()函数合并原始消息和待添加属性 |
"timestamp":$millis() | 添加一个时间戳属性 |
"a":100 | 添加一个常量属性 |
删除原始消息中指定属性
-
适用场景
原始消息中包含业务不需要的属性,通过指定keys删除指定属性
- 原始消息
1 {
2 "reqId": "442c1da4-9d3a-4f9b-a6e9-bfe858e4ac43",
3 "method": "thing.event.post",
4 "timestamp": 1596815775720,
5 "a":100,
6 "deviceName": "device001",
7 "productName": "product001",
8 "properties": {
9 "temperature": 35,
10 "humidity": 22
11 }
12 }
- 目标数据
删除原始消息中timestamp、a两个属性
1 {
2 "reqId":"442c1da4-9d3a-4f9b-a6e9-bfe858e4ac43",
3 "method":"thing.event.post",
4 "deviceName":"device001",
5 "productName":"product001",
6 "properties":{
7 "temperature":35,
8 "humidity":22
9 }
10 }
- 查询语句
1 $sift($, function($v,$k,$a) {$k!="timestamp" and $k!="a"})
- 详解
使用$sift()函数遍历对象所有属性并且对每个属性使用回调函数判断是否需要删除并返回过滤后的新对象。
语法 | 描述 |
---|---|
$k!="timestamp" and $k!="a" | 设置过滤条件,此例中我们删除timestamp和a两个属性 |
$sift($, function($v,$k,$a) {$k!="timestamp" and $k!="a"}) | 过滤原始消息,得到新对象 |
数据写入TSDB
设备上报数据存储到TSDB是物联网场景中比较常见的场景,规则引擎支持将设备数据转换为TSDB写入接口要求的格式。
设备数据通过规则引擎写入TSDB时必须转换为以下格式(具体格式要求请参考TSDB数据接口文档):
1 {
2 "datapoints": [{
3 "metric": "cpu_idle",
4 "tags": {
5 "host": "server1",
6 "rack": "rack1"
7 },
8 "timestamp": 1465376157007,
9 "value": 51
10 }, {
11 "metric": "cpu_idle",
12 "tags": {
13 "host": "server2",
14 "rack": "rack2"
15 },
16 "values": [
17 [1465376269769, 67],
18 [1465376325057, 60]
19 ]
20 }]
21 }
我们提供了多种数据格式写入TSDB的查询语句模板,可在规则「编辑调试」页点击「数据查询模板」查看并选择符合自己设备上报格式的模板编辑修改为最终的查询语句。
特别注意,在以TSDB作为规则目的地的场景下,查询语句的目的是为了构建TSDB写入接口支持的数据格式,任何可以构造出TSDB写入要求格式的查询语句均可,模板示例仅仅提供几种常见消息格式的转化思路,模板中未包含的原始消息格式,用户可以利用函数自行编写更多的语句
模板01-k:v格式、固定点数逐点配置
- 适用场景
适用于原始消息中data point点数固定场景,每个data point可指定tags、timestamp等。
- 原始消息
1{
2 "reported": {
3 "cpu_idle": 51,
4 "wind": {
5 "speed": [
6 [
7 1465376269769,
8 67
9 ],
10 [
11 1465376325057,
12 60
13 ]
14 ]
15 }
16 },
17 "host": "server2",
18 "rack": "rack2",
19 "lastupdate": 1465376157007
20}
- 目标数据
1{
2 "datapoints": [
3 {
4 "metric": "cpu_idle",
5 "tags": {
6 "host": "server2",
7 "rack": "rack2"
8 },
9 "timestamp": 1465376157007,
10 "value": 51
11 },
12 {
13 "metric": "wind",
14 "field": "speed",
15 "tags": {
16 "host": "server2",
17 "rack": "rack2"
18 },
19 "values": [
20 [
21 1465376269769,
22 67
23 ],
24 [
25 1465376325057,
26 60
27 ]
28 ]
29 }
30 ]
31}
- 查询语句
1 {
2 "datapoints": [
3 {
4 "metric": "cpu_idle",
5 "tags": {
6 "host": host,
7 "rack": rack
8 },
9 "timestamp": lastupdate,
10 "value": reported.cpu_idle
11 },
12 {
13 "metric": "wind",
14 "field": "speed",
15 "tags": {
16 "host": host,
17 "rack": rack
18 },
19 "timestamp": lastupdate,
20 "values": reported.wind.speed
21 }
22 ]
23 }
- 详解
使用基本的赋值语句直接构建目标数据格式。
模板02-k:v格式、单域-统一时间戳
- 适用场景
适用于数据点包含在对象中以{key:value}格式上报,且数据点个数可变场景。
- 原始消息
1 {
2 "reported": {
3 "key1": 100,
4 "key2": "rack33333"
5 },
6 "host": "server2",
7 "rack": "rack2",
8 "lastupdate": 1465376157007
9 }
- 目标数据
1 {
2 "datapoints": [
3 {
4 "metric": "key1",
5 "tags": {
6 "host": "server2",
7 "rack": "rack2"
8 },
9 "timestamp": 1465376157007,
10 "value": 100
11 },
12 {
13 "metric": "key2",
14 "tags": {
15 "host": "server2",
16 "rack": "rack2"
17 },
18 "timestamp": 1465376157007,
19 "value": "rack33333"
20 }
21 ]
22}
- 查询语句
1 {
2 "datapoints": [$each(reported, function($v, $k) {
3 {
4 "metric": $k,
5 "tags": {
6 "host": host,
7 "rack": rack
8 },
9 "timestamp": lastupdate,
10 "value": $v
11 }
12 })]
13 }
- 详解
利用$each()函数遍历reported对象所有元素,构建目标格式。
模板03-k:v格式、多域-统一时间戳
- 适用场景
适用于数据点包含在对象中以{key:value}格式上报,且数据点个数可变场景。
- 原始消息
1 {
2 "reported": {
3 "key1": 100,
4 "key2": "rack33333"
5 },
6 "host": "server2",
7 "rack": "rack2",
8 "lastupdate": 1465376157007
9}
- 目标数据
1 {
2 "datapoints": [
3 {
4 "metric": "cpu_idle",
5 "field": "key1",
6 "tags": {
7 "host": "server2",
8 "rack": "rack2"
9 },
10 "timestamp": 1465376157007,
11 "value": 100
12 },
13 {
14 "metric": "cpu_idle",
15 "field": "key2",
16 "tags": {
17 "host": "server2",
18 "rack": "rack2"
19 },
20 "timestamp": 1465376157007,
21 "value": "rack33333"
22 }
23 ]
24}
- 查询语句
1 {
2 "datapoints": [$each(reported, function($v, $k) {
3 {
4 "metric": "cpu_idle",
5 "field": $k,
6 "tags": {
7 "host": host,
8 "rack": rack
9 },
10 "timestamp": lastupdate,
11 "value": $v
12 }
13 })]
14}
- 详解
利用$each()函数遍历reported对象所有元素,构建目标格式。
模板04-k:v格式、单域-独立时间戳
- 适用场景
适用于数据点包含在对象中以key:value和key:time分别已对象格式上报,且数据点个数可变场景。
- 原始消息
1 {
2 "reported": {
3 "key1": 100,
4 "key2": "rack33333"
5 },
6 "host": "server2",
7 "rack": "rack2",
8 "lastupdate": {
9 "key1": 1465376157007,
10 "key2": 1465376157337
11 }
12}
- 目标数据
1 {
2 "datapoints": [
3 {
4 "metric": "key1",
5 "tags": {
6 "host": "server2",
7 "rack": "rack2"
8 },
9 "timestamp": 1465376157007,
10 "value": 100
11 },
12 {
13 "metric": "key2",
14 "tags": {
15 "host": "server2",
16 "rack": "rack2"
17 },
18 "timestamp": 1465376157337,
19 "value": "rack33333"
20 }
21 ]
22}
- 查询语句
1 {
2 "datapoints": [$each(reported, function($v, $k) {
3 {
4 "metric": $k,
5 "tags": {
6 "host": host,
7 "rack": rack
8 },
9 "timestamp": $lookup(lastupdate, $k),
10 "value": $v
11 }
12 })]
13}
- 详解
利用$each()函数遍历reported对象所有元素,同时使用$lookup()函数获取存储时间戳的对象lastupdate中对应的值赋值给timestamp。
模板05-k:v格式、多域-独立时间戳
- 适用场景
适用于数据点包含在对象中以key:value和key:time分别已对象格式上报,且数据点个数可变场景。
- 原始消息
1 {
2 "reported": {
3 "key1": 100,
4 "key2": "rack33333"
5 },
6 "host": "server2",
7 "rack": "rack2",
8 "lastupdate": {
9 "key1": 1465376157007,
10 "key2": 1465376157337
11 }
12}
- 目标数据
1 {
2 "datapoints": [
3 {
4 "metric": "cpu_idle",
5 "field": "key1",
6 "tags": {
7 "host": "server2",
8 "rack": "rack2"
9 },
10 "timestamp": 1465376157007,
11 "value": 100
12 },
13 {
14 "metric": "cpu_idle",
15 "field": "key2",
16 "tags": {
17 "host": "server2",
18 "rack": "rack2"
19 },
20 "timestamp": 1465376157337,
21 "value": "rack33333"
22 }
23 ]
24}
- 查询语句
1 {
2 "datapoints": [$each(reported, function($v, $k) {
3 {
4 "metric": 'cpu_idle',
5 "field": $k,
6 "tags": {
7 "host": host,
8 "rack": rack
9 },
10 "timestamp": $lookup(lastupdate, $k),
11 "value": $v
12 }
13 })]
14}
- 详解
利用$each()函数遍历reported对象所有元素,同时使用$lookup()函数获取存储时间戳的对象lastupdate中对应的值赋值给timestamp。
模板06-kvt格式数组、单域
- 适用场景
key:value数组格式写入适用于使用{key:x,value:y,time:1596538832000}数组方式批量上报数据场景,如断线续传、打包上传等。
- 原始消息
1 {
2 "reported": [
3 {
4 "k": "key1",
5 "v": "value1",
6 "t": 1465376157007
7 },
8 {
9 "k": "key2",
10 "v": 100,
11 "t": 1465376157337
12 }
13 ],
14 "host": "server2",
15 "rack": "rack2"
16}
- 目标数据
1 {
2 "datapoints": [
3 {
4 "metric": "key1",
5 "tags": {
6 "host": "server2",
7 "rack": "rack2"
8 },
9 "timestamp": 1465376157007,
10 "value": "value1"
11 },
12 {
13 "metric": "key2",
14 "tags": {
15 "host": "server2",
16 "rack": "rack2"
17 },
18 "timestamp": 1465376157337,
19 "value": 100
20 }
21 ]
22}
- 查询语句
1 {
2 "datapoints": [$map(reported, function($v, $i, $a) {
3 {
4 "metric": $v.k,
5 "tags": {
6 "host": host,
7 "rack": rack
8 },
9 "timestamp": $v.t,
10 "value": $v.v
11 }
12 })]
13}
- 详解
利用$map()函数遍历reported数组对象所有元素。
模板07-kvt格式数组、多域
- 适用场景
key:value数组格式写入适用于使用{key:x,value:y,time:1596538832000}数组方式批量上报数据场景,如断线续传、打包上传等。
- 原始消息
1 {
2 "reported": [
3 {
4 "k": "key1",
5 "v": "value1",
6 "t": 1465376157007
7 },
8 {
9 "k": "key2",
10 "v": 100,
11 "t": 1465376157337
12 }
13 ],
14 "host": "server2",
15 "rack": "rack2"
16}
- 目标数据
1 {
2 "datapoints": [
3 {
4 "metric": "cpu_idle",
5 "field": "key1",
6 "tags": {
7 "host": "server2",
8 "rack": "rack2"
9 },
10 "timestamp": 1465376157007,
11 "value": "value1"
12 },
13 {
14 "metric": "cpu_idle",
15 "field": "key2",
16 "tags": {
17 "host": "server2",
18 "rack": "rack2"
19 },
20 "timestamp": 1465376157337,
21 "value": 100
22 }
23 ]
24}
- 查询语句
1 {
2 "datapoints": [$map(reported, function($v, $i, $a) {
3 {
4 "metric": 'cpu_idle',
5 "field": $v.k,
6 "tags": {
7 "host": host,
8 "rack": rack
9 },
10 "timestamp": $v.t,
11 "value": $v.v
12 }
13 })]
14}
- 详解
利用$map()函数遍历reported数组对象所有元素。
模板08-k:vt格式数组、单域
- 适用场景
适用于使用{key:value,time:1596538832000}数组方式批量上报数据场景,如断线续传、打包上传等。
- 原始消息
1 {
2 "reported": [
3 {
4 "k1": "value1",
5 "t": 1465376157007
6 },
7 {
8 "k2": 100,
9 "t": 1465376157337
10 }
11 ],
12 "host": "server2",
13 "rack": "rack2"
14}
- 目标数据
1 {
2 "datapoints": [
3 {
4 "metric": "k1",
5 "tags": {
6 "host": "server2",
7 "rack": "rack2"
8 },
9 "timestamp": 1465376157007,
10 "value": "value1"
11 },
12 {
13 "metric": "k2",
14 "tags": {
15 "host": "server2",
16 "rack": "rack2"
17 },
18 "timestamp": 1465376157337,
19 "value": 100
20 }
21 ]
22}
- 查询语句
1 {
2 "datapoints": [$map(reported, function($v, $i, $a) {
3 {
4 "metric": $filter($keys($v), function($v, $i, $a) { $v!='t' }),
5 "tags": {
6 "host": host,
7 "rack": rack
8 },
9 "timestamp": $v.t,
10 "value": $lookup($v,$filter($keys($v), function($v, $i, $a) { $v!='t' }))
11 }
12 })]
13}
- 详解
利用$map()函数遍历reported对象所有元素,在赋值metric时使用$filter()函数过滤掉其他key从而获得需要存储的metric,在赋值value时使用$lookup()函数查询metric对应的value。
模板09-k:vt格式数组、多域
- 适用场景
适用于使用{key:value,time:1596538832000}数组方式批量上报数据场景,如断线续传、打包上传等。
- 原始消息
1 {
2 "reported": [
3 {
4 "k1": "value1",
5 "t": 1465376157007
6 },
7 {
8 "k2": 100,
9 "t": 1465376157337
10 }
11 ],
12 "host": "server2",
13 "rack": "rack2"
14}
- 目标数据
1 {
2 "datapoints": [
3 {
4 "metric": "cpu_idle",
5 "field": "key1",
6 "tags": {
7 "host": "server2",
8 "rack": "rack2"
9 },
10 "timestamp": 1465376157007,
11 "value": "value1"
12 },
13 {
14 "metric": "cpu_idle",
15 "field": "key2",
16 "tags": {
17 "host": "server2",
18 "rack": "rack2"
19 },
20 "timestamp": 1465376157337,
21 "value": 100
22 }
23 ]
24}
- 查询语句
1 {
2 "datapoints": [$map(reported, function($v, $i, $a) {
3 {
4 "metric": "cpu_idle",
5 "field": $filter($keys($v), function($v, $i, $a) { $v!='t' }),
6 "tags": {
7 "host": host,
8 "rack": rack
9 },
10 "timestamp": $v.t,
11 "value": $lookup($v, $filter($keys($v), function($v, $i, $a) { $v!='t' }))
12 }
13 })]
14}
- 详解
利用$map()函数遍历reported对象所有元素,在赋值metric时使用$filter()函数过滤掉其他key从而获得需要存储的metric,在赋值value时使用$lookup()函数查询metric对应的value。
过滤指定条件数据
根据设备固件版本处理消息
- 适用场景
当对设备进行功能升级时,新增功能带来设备上报的payload格式发生变化,此时在平台必须针对不同固件版本的设备使用不同的规则进行处理。
- 数据来源
假设设备上报数据的topic为$iot/123456/events,规则数据来源设置为:
1$iot/+/events
- 原始消息
在设备上报消息中增加表示固件版本的字段,示例中我们使用“version”来表示
1 {
2 "reqId":"442c1da4-9d3a-4f9b-a6e9-bfe858e4ac43",
3 "version":"1.0",
4 "deviceName":"device001",
5 "productName":"product001",
6 "properties":{
7 "temperature":35,
8 "humidity":22
9 },
10 "other":{
11 "Misc":null
12 }
13 }
- 过滤条件
1 version="1.0"
- 详解
利用设置的version="1.0"条件,仅当固件版本为1.0的设备上报的消息被当前规则的查询条件继续处理,其他版本设备可按相同方式设置多个规则分别进行处理。