对接spark-sql
1 BMR spark sql
1.1 Spark-tsdb-connector
TSDB对接spark sql是通过实现org.apache.spark.rdd.RDD(即Resilient Distributed Dataset)和一些相关的接口,方便用户通过spark来查询TSDB的数据。
Jar下载地址:http://tsdb-bos.gz.bcebos.com/spark-tsdb-connector-all.jar
如果是本地spark集群,请下载jar到本地;如果使用bmr,则上传到bos或者直接使用地址bos://iot-tsdb/spark-tsdb-connector-all.jar。
支持的版本:spark 2.1.0,jdk 1.7。
1.2 作业程序
1.2.1 查询tsdb的通用作业程序
Main class:
1package com.baidu.cloud.bmr.spark;
2
3import org.apache.spark.SparkConf;
4import org.apache.spark.api.java.JavaSparkContext;
5import org.apache.spark.sql.Dataset;
6import org.apache.spark.sql.Row;
7import org.apache.spark.sql.SQLContext;
8
9public class TsdbSparkSql {
10
11 public static void main(String[] args) {
12 if (args.length != 6) {
13 System.err.println("usage: spark-submit com.baidu.cloud.bmr.spark.TsdbSparkSql <endpoint> <ak> <sk> "
14 + "<metric> <sql> <output>");
15 System.exit(1);
16 }
17 String endpoint = args[0];
18 String ak = args[1];
19 String sk = args[2];
20 String metric = args[3];
21 String sql = args[4];
22 String output = args[5];
23
24 SparkConf conf = new SparkConf().setAppName("TsdbSparkSql");
25 JavaSparkContext sc = new JavaSparkContext(conf);
26 SQLContext sqlContext = new SQLContext(sc);
27 Dataset<Row> dataset = sqlContext.read()
28 .format("tsdb") // 设置为tsdb源
29 .option("endpoint", endpoint) // tsdb实例endpoint
30 .option("access_key", ak) // AK
31 .option("secret_key", sk) // SK
32 .option("metric_name", metric) // 对应的metric
33 .load();
34 dataset.registerTempTable(metric);
35 sqlContext.sql(sql).rdd().saveAsTextFile(output); // 执行sql并存储到output中
36 }
37
38}
endpoint为IP:PORT格式的情形下:
1package com.baidu.cloud.bmr.spark;
2
3import org.apache.spark.SparkConf;
4import org.apache.spark.api.java.JavaSparkContext;
5import org.apache.spark.sql.Dataset;
6import org.apache.spark.sql.Row;
7import org.apache.spark.sql.SQLContext;
8
9public class TsdbSparkSql {
10
11 public static void main(String[] args) {
12 if (args.length != 8) {
13 System.err.println("usage: spark-submit com.baidu.cloud.bmr.spark.TsdbSparkSql <endpoint> <host> <grpc_port> <ak> <sk> "
14 + "<metric> <sql> <output>");
15 System.exit(1);
16 }
17 String endpoint = args[0];
18 String host = args[1];
19 String grpcPort = args[2];
20 String ak = args[3];
21 String sk = args[4];
22 String metric = args[5];
23 String sql = args[6];
24 String output = args[7];
25 SparkConf conf = new SparkConf().setAppName("TsdbSparkSql");
26 JavaSparkContext sc = new JavaSparkContext(conf);
27 SQLContext sqlContext = new SQLContext(sc);
28 Dataset<Row> dataset = sqlContext.read()
29 .format("tsdb") // 设置为tsdb源
30 .option("endpoint", endpoint) // tsdb实例endpoint
31 .option("host", host) // host
32 .option("grpc_port", grpcPort) // grpc port
33 .option("access_key", ak) // AK
34 .option("secret_key", sk) // SK
35 .option("metric_name", metric) // 对应的metric
36 .load();
37 dataset.registerTempTable(metric);
38 sqlContext.sql(sql).rdd().saveAsTextFile(output); // 执行sql并存储到output中
39
40 }
41}
依赖:
1<dependency>
2 <groupId>org.apache.spark</groupId>
3 <artifactId>spark-sql_2.10</artifactId>
4 <version>2.1.2</version>
5</dependency>
需要将程序打包为jar文件,放入bos中,在配置作业时需要用到该文件的bos路径。
1.2.2 更多参数的作业程序
1Main class:
2
3package com.baidu.cloud.bmr.spark;
4
5import static org.apache.spark.sql.types.DataTypes.DoubleType;
6import static org.apache.spark.sql.types.DataTypes.LongType;
7import static org.apache.spark.sql.types.DataTypes.StringType;
8
9import org.apache.spark.SparkConf;
10import org.apache.spark.api.java.JavaSparkContext;
11import org.apache.spark.sql.Dataset;
12import org.apache.spark.sql.Row;
13import org.apache.spark.sql.SQLContext;
14import org.apache.spark.sql.types.Metadata;
15import org.apache.spark.sql.types.StructField;
16import org.apache.spark.sql.types.StructType;
17
18public class TsdbSparkSqlMoreOptions {
19
20 public static void main(String[] args) {
21 if (args.length != 6) {
22 System.err.println("usage: spark-submit com.baidu.cloud.bmr.spark.TsdbSparkSqlMoreOptions <endpoint> <ak> <sk> "
23 + "<metric> <sql> <output>");
24 System.exit(1);
25 }
26 String endpoint = args[0];
27 String ak = args[1];
28 String sk = args[2];
29 String metric = args[3];
30 String sql = args[4];
31 String output = args[5];
32
33 SparkConf conf = new SparkConf().setAppName("TsdbSparkSqlMoreOptions");
34 JavaSparkContext sc = new JavaSparkContext(conf);
35 SQLContext sqlContext = new SQLContext(sc);
36 StructType schema = new StructType(new StructField[] {
37 new StructField("time", LongType, false, Metadata.empty()), // 设置time列,为long
38 new StructField("value", DoubleType, false, Metadata.empty()), // 设置value列,为double
39 new StructField("city", StringType, false, Metadata.empty()) // 设置city列,为string
40 });
41 Dataset<Row> dataset = sqlContext.read()
42 .format("tsdb") // 设置为tsdb源
43 .schema(schema) // 设置自定义schema
44 .option("endpoint", endpoint) // tsdb实例endpoint
45 .option("access_key", ak) // AK
46 .option("secret_key", sk) // SK
47 .option("metric_name", metric) // 对应的metric
48 .option("field_names", "value") // schema中属于field的列名,用逗号分割,如"field1, field2",表示有两个field分别为field1,field2
49 .option("tag_names", "city") // 指定schema中的tag名,用逗号分割,"city,latitude",表示有两个tag分别为city,latitude
50 .option("split_number", "10") // 设置split的个数,split数据时会尽量与split number接近。
51 .load();
52 dataset.registerTempTable(metric);
53 sqlContext.sql(sql).rdd().saveAsTextFile(output);
54 }
55
56}
依赖:
1<dependency>
2 <groupId>org.apache.spark</groupId>
3 <artifactId>spark-sql_2.10</artifactId>
4 <version>2.1.2</version>
5</dependency>
1.3 创建bmr spark集群
在使用BMR时,强烈建议您先阅读BMR文档.
选择BMR1.1.0版本,并选择spark 2.1.0。
1.4 创建作业
作业配置如下:
1应用程序位置:bos://<tsdb-spark-sql-sample>.jar
2
3Spark-submit:--class com.baidu.cloud.bmr.spark.TsdbSparkSql --jars bos://<to>/spark-tsdb-connector-all.jar
4
5应用程序参数:<databaseName>.<databaseId>.tsdb.iot.gz.baidubce.com <AK> <SK> <metric> "select count(1) from <metric>" "bos://<to>/output/data"
需要注意的是:
- 应用程序配置其实是与2.1中作业程序相关的,请根据自己的作业程序来配置;
- Spark-submit中记得需要最后的“--jars”参数不能省略,需要指定为1.1中tsdb的connector。
1.5 场景示例
1.5.1 计算风速
风速数据由传感器定时上传到tsdb中,数据包含两个field分别为x和y,表示x轴和y轴方向的风速,如下由两个垂直方向的风速来计算出总的风速。
1Main class:
2
3package com.baidu.cloud.bmr.spark;
4
5import static org.apache.spark.sql.types.DataTypes.DoubleType;
6import static org.apache.spark.sql.types.DataTypes.LongType;
7
8import org.apache.spark.SparkConf;
9import org.apache.spark.api.java.JavaSparkContext;
10import org.apache.spark.sql.Dataset;
11import org.apache.spark.sql.Row;
12import org.apache.spark.sql.SQLContext;
13import org.apache.spark.sql.types.Metadata;
14import org.apache.spark.sql.types.StructField;
15import org.apache.spark.sql.types.StructType;
16
17public class WindSpeed {
18
19 public static void main(String[] args) {
20 String endpoint = "<endpoint>";
21 String ak = "<AK>";
22 String sk = "<SK>";
23 String metric = "WindSpeed";
24 String output = "bos://<to>/output/data";
25
26 SparkConf conf = new SparkConf().setAppName("TsdbSparkSqlMoreOptions");
27 JavaSparkContext sc = new JavaSparkContext(conf);
28 SQLContext sqlContext = new SQLContext(sc);
29 StructType schema = new StructType(new StructField[] {
30 new StructField("time", LongType, false, Metadata.empty()), // 设置time列,为long
31 new StructField("x", DoubleType, false, Metadata.empty()), // 设置x列,为double
32 new StructField("y", DoubleType, false, Metadata.empty()) // 设置y列,为double
33 });
34 Dataset<Row> dataset = sqlContext.read()
35 .format("tsdb") // 设置为tsdb源
36 .schema(schema) // 设置自定义schema
37 .option("endpoint", endpoint) // tsdb实例endpoint
38 .option("access_key", ak) // AK
39 .option("secret_key", sk) // SK
40 .option("metric_name", metric) // 对应的metric
41 .option("field_names", "x,y") // schema中属于field的列名
42 .load();
43 dataset.registerTempTable(metric);
44 sqlContext.sql("select time, sqrt(pow(x, 2) + pow(y, 2)) as speed from WindSpeed")
45 .rdd()
46 .saveAsTextFile(output);
47 }
48
49}
依赖:
1<dependency>
2 <groupId>org.apache.spark</groupId>
3 <artifactId>spark-sql_2.10</artifactId>
4 <version>2.1.2</version>
5</dependency>
原始数据:
metric:WindSpeed
time | field : x | field : y |
---|---|---|
1512086400000 | 3.0 | 4.0 |
1512086410000 | 1.0 | 2.0 |
1512086420000 | 2.0 | 3.0 |
结果:
结果输出到output指定的bos文件夹中,样例如下
1[1512086400000,5.000]
2[1512086410000,2.236]
3[1512086420000,3.606]
1.5.2 计算车辆在时间上的使用情况
车辆在行驶过程中会定时(每10秒)将数据上传到tsdb中,数据中包含车速speed。需要统计三种时长:
(1)停止时长:一段时间内这台车子有上报数据,但是上报的车速显示是0,可能是车子在等红灯。
(2)运行时长:一段时间内这台车子有上报数据,且上报的车速显示大于0,这台车子正在行驶中。
(3)离线时长:一段时间内这台车子没有上报数据的时长,这台车子已经停下并熄火了。
1Main class:
2
3package com.baidu.cloud.bmr.spark;
4
5import static org.apache.spark.sql.types.DataTypes.LongType;
6import static org.apache.spark.sql.types.DataTypes.StringType;
7
8import org.apache.spark.SparkConf;
9import org.apache.spark.api.java.JavaSparkContext;
10import org.apache.spark.sql.Dataset;
11import org.apache.spark.sql.Row;
12import org.apache.spark.sql.SQLContext;
13import org.apache.spark.sql.types.Metadata;
14import org.apache.spark.sql.types.StructField;
15import org.apache.spark.sql.types.StructType;
16
17public class VehicleSpeed {
18
19 public static void main(String[] args) {
20 String endpoint = "<endpoint>";
21 String ak = "<AK>";
22 String sk = "<SK>";
23 String metric = "vehicle";
24 String output = "bos://<to>/output/data";
25
26 SparkConf conf = new SparkConf().setAppName("TsdbSparkSqlMoreOptions");
27 JavaSparkContext sc = new JavaSparkContext(conf);
28 SQLContext sqlContext = new SQLContext(sc);
29 StructType schema = new StructType(new StructField[] {
30 new StructField("time", LongType, false, Metadata.empty()), // 设置time列,为long
31 new StructField("speed", LongType, false, Metadata.empty()), // 设置speed列,为long
32 new StructField("carId", StringType, false, Metadata.empty()) // 设置carId列,为string
33 });
34 Dataset<Row> dataset = sqlContext.read()
35 .format("tsdb") // 设置为tsdb源
36 .schema(schema) // 设置自定义schema
37 .option("endpoint", endpoint) // tsdb实例endpoint
38 .option("access_key", ak) // AK
39 .option("secret_key", sk) // SK
40 .option("metric_name", metric) // 对应的metric
41 .option("field_names", "speed") // schema中属于field的列名
42 .option("tag_names", "cardId") // 指定tag名
43 .load();
44 dataset.registerTempTable(metric);
45 sqlContext.sql("select floor((time - 1512057600000) / 86400000) + 1 as day, count(*) * 10 as stop_seconds"
46 + " from vehicle where carId='123' and time >= 1512057600000 and time < 1514736000000 and speed = 0"
47 + " group by floor((time - 1512057600000) / 86400000)")
48 .rdd()
49 .saveAsTextFile(output + "/stopSeconds");
50 sqlContext.sql("select floor((time - 1512057600000) / 86400000) + 1 as day, count(*) * 10 as run_seconds"
51 + " from vehicle where carId='123' and time >= 1512057600000 and time < 1514736000000 and speed > 0"
52 + " group by floor((time - 1512057600000) / 86400000)")
53 .rdd()
54 .saveAsTextFile(output + "/runSeconds");
55 sqlContext.sql("select floor((time - 1512057600000) / 86400000) + 1 as day, 2678400 - count(*) * 10 as"
56 + " offline_seconds from vehicle where carId='123' and time >= 1512057600000 and time < 1514736000000"
57 + " group by floor((time - 1512057600000) / 86400000)")
58 .rdd()
59 .saveAsTextFile(output + "/offlineSeconds");
60 }
61
62}
依赖:
1<dependency>
2 <groupId>org.apache.spark</groupId>
3 <artifactId>spark-sql_2.10</artifactId>
4 <version>2.1.2</version>
5</dependency>
原始数据
metric : vehicle
time | field : speed | tag |
---|---|---|
1512057600000 | 40 | carId=123 |
1512057610000 | 60 | carId=123 |
1512057620000 | 50 | carId=123 |
... ... | ... | carId=123 |
1514721600000 | 10 | carId=123 |
结果
结果输出到output指定的bos文件夹中,样例如下:
1[1,3612]
2[2,3401]
3...
4[31,3013]
5
6[1,17976]
7[2,17968]
8...
9[31,17377]
10
11[1,64812]
12[2,65031]
13...
14[31,66010]