Step(作业)
更新时间:2022-04-15
概述
作业是和集群相关联的资源,对作业的操作需要指定相关集群的ID。
添加steps
BMR支持多种类型的作业,不同类型的作业有不同的配置项。如下代码可向指定的hadoop类型的集群添加Custom Jar、Streaming、Hive、Pig作业。
添加作业可以通过配置AddStepsRequest对象的clientToken属性来保证创建请求的幂等性。clientToken是一个长度不超过64位的ASCII字符串,配置AddStepsRequest对象的clientToken方法是:addStepsRequest.withClientToken(clientToken)
。
请求返回的AddStepsResponse对象包含了新创建作业ID的数组List<String>
,获取方法为response.getStepIds()
。
Plain Text
1public void addSteps(BmrClient bmrClient, String clusterId) {
2 List<StepConfig> steps = new ArrayList<StepConfig>();
3 // Custom Jar作业
4 steps.add(
5 new JavaStepConfig()
6 .withName("java-step")
7 .withActionOnFailure("Continue")
8 .withJar("bos://benchmark/hadoop/hadoop-mapreduce-examples.jar")
9 .withMainClass("org.apache.hadoop.examples.WordCount")
10 .withArguments("bos://path/to/input bos://path/to/java_output")
11 );
12
13 // Streaming作业
14 steps.add(
15 new StreamingStepConfig()
16 .withName("streaming-step")
17 .withActionOnFailure("Continue")
18 .withInput("bos://path/to/input_streaming")
19 .withMapper("cat")
20 .withOutput("bos://path/to/output_streaming")
21 );
22
23 List<AdditionalFile> additionalFiles = new ArrayList<AdditionalFile>();
24 additionalFiles.add(new AdditionalFile().withRemote("bos://path/to/testA.jar").withLocal("testB.jar"));
25
26 // 使用附加文件的Streaming作业
27 steps.add(
28 new StreamingStepConfig()
29 .withName("streaming-step2")
30 .withActionOnFailure("Continue")
31 .withInput("bos://path/to/input_streaming2")
32 .withMapper("cat")
33 .withReducer("cat")
34 .withOutput("bos://path/to/output_streaming2")
35 .withAdditionalFiles(additionalFiles)
36 );
37
38 // Hive作业
39 steps.add(
40 new HiveStepConfig()
41 .withName("hive-step")
42 .withActionOnFailure("Continue")
43 .withScript("bos://path/to/hive/hql/hive_src.hql")
44 .withInput("bos://path/to/hive/data/hive_src.data")
45 .withOutput("bos://path/to/output_hive")
46 .withArguments("--hivevar LOCAT=bos://chy3/hive/tables/src")
47 );
48
49 // Pig作业
50 steps.add(
51 new PigStepConfig()
52 .withName("pig-step")
53 .withActionOnFailure("Continue")
54 .withScript("bos://path/to/pig/script/pig_grep.pig")
55 .withInput("bos://path/to/pig/data/pig_grep.data")
56 .withOutput("bos://path/to/output_pig")
57 );
58
59 //Spark作业
60 steps.add(
61 new SparkStepConfig()
62 .withName("spark-step")
63 .withActionOnFailure("Continue")
64 .withJar("bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar")
65 .withSubmitOptions("--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer")
66 .withArguments("bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out")
67 );
68
69
70try {
71 AddStepsResponse response = bmrClient.addSteps(
72 new AddStepsRequest().withClusterId(clusterId)
73 .withSteps(steps)
74 );
75 // 输出各个添加的作业ID
76 for (String stepId : response.getStepIds()) {
77 System.out.println(stepId);
78 }
79} catch (BceServiceException e) {
80 System.out.println("Add steps failed: " + e.getErrorMessage());
81} catch (BceClientException e) {
82 System.out.println(e.getMessage());
83}
84}
列出全部steps
如下代码可以罗列出指定集群上的全部作业,用户可以通过配置查询参数pageNo和pageSize来限制每次请求返回的作业数目和查询记录的起点。
Plain Text
1public void listSteps(BmrClient bmrClient, String clusterId) {
2 int maxKeys = 10;
3 try {
4 // 罗列指定集群ID相关的作业
5 ListStepsRequest request = new ListStepsRequest().withClusterId(clusterId);
6 ListStepsResponse response = bmrClient.listSteps(request);
7
8 // 输出各个作业的状态
9 for (Step step : response.getSteps()) {
10 System.out.println(step.getStatus().getState());
11 }
12 } catch (BceServiceException e) {
13 System.out.println("List steps failed: " + e.getErrorMessage());
14 }
15}
请求返回的ListStepsResponse对象包含了相关的集群对象数组List<Step>
, 获取集群对象数组的方法为response.getSteps()
。作业对象Step的属性包括了作业相关的配置信息,每个属性均有对应的getter访问器方法。
Plain Text
1public class Step {
2 private String id; // 作业ID
3 private String actionOnFailure; // 作业失败策略
4 private String type; // 作业类型
5 private Map<String, String> properties; // 作业描述
6 private String name; // 作业名称
7 private StepStatus status; // 作业状态
8 }
9
10public class StepStatus {
11 private String createDateTime; // 作业提交时间
12 private String endDateTime; // 作业结束时间
13 private String startDateTime; // 作业开始执行的时间
14 private String state; // 作业状态字段
15}
查询指定的step
如下代码可以查看指定作业的信息:
请求返回的GetStepResponse对象包含了获取作业属性的getter访问器方法,可以直接调用response的访问器方法来获得目标作业的属性信息。
Plain Text
1public void getStep(BmrClient bmrClient, String clusterId, String stepId) {
2 try {
3 // 方法 1. 查询指定集群ID、作业ID对应作业的信息
4 GetStepResponse response1 = bmrClient.getStep(clusterId, stepId);
5
6 // 方法 2. 自定义GetStepRequest对象的查询请求
7 GetStepResponse response2 = bmrClient.getStep(
8 new GetStepRequest().withClusterId(clusterId).withStepId(stepId)
9 );
10 // 输出作业的状态信息
11 System.out.println(response1.getStatus().getState());
12 } catch (BceServiceException e) {
13 System.out.println("Describe steps failed: " + e.getErrorMessage());
14 }
15}