工作流操作
更新时间:2019-08-07
Workflow即媒资处理工作流,由stage以及stage的依赖关系构成;Workflow名称在单个用户下具有唯一性,即用户创建的工作流名称不能重复。
新建工作流
创建工作流需要构建工作流内的stage和stage的依赖关系。使用如下代码可以新建工作流,代码中给出了创建一个简单工作流和复杂工作流的方法。
Java
1private void createWorkflow(BvwClient client) {
2 client.createWorkflow(createSimpleWorkflowRequest("simple_workflow"));
3 client.createWorkflow(createComplexWorkflowRequest("complex_workflow"));
4}
5
6// 创建一个简单的工作流
7private WorkflowCreateRequest createSimpleWorkflowRequest(String workflowName) {
8 // 创建开始节点(start stage)
9 StageParamModel startStageParam = StageParamModel.of("{}");
10 StageModel startStage = StageModel.of("start", startStageParam, StageType.START);
11 // 创建转码节点(transcoding stage)
12 StageParamModel transcodingStageParam = StageParamModel.of(
13 "{\"job\":{\"pipelineName\":\"test_transcoding\",\"source\":{},\"target\":{\"presetName\":"
14 + "\"bvwtest\",\"targetBucket\":\"videoworks-source\"}},\"needDetectBlackBoard\":true,"
15 + "\"adjustOrientation\":\"ALL\"}");
16 StageModel transcodingStage = StageModel.of("transcoding", transcodingStageParam,
17 StageType.TRANSCODING);
18 // 创建发布节点(publish stage)
19 StageParamModel publishStageParam = StageParamModel.of("{}");
20 StageModel publishStage = StageModel.of("publish", publishStageParam, StageType.PUBLISH);
21
22 // 设置节点
23 Map<String, StageModel> stages = Maps.newHashMap();
24 stages.put(startStage.getName(), startStage);
25 stages.put(transcodingStage.getName(), transcodingStage);
26 stages.put(publishStage.getName(), publishStage);
27
28 // 设置节点依赖关系
29 Map<String, List<String>> dependencies = Maps.newHashMap();
30 dependencies.put(startStage.getName(), Lists.newArrayList(transcodingStage.getName());
31 dependencies.put(transcodingStage.getName(), Lists.newArrayList(publishStage.getName()));
32 dependencies.put(publishStage.getName(), Lists.newArrayList());
33
34 // 构建WorkflowCreateRequest
35 DagModel dag = DagModel.of(stages, dependencies);
36 return WorkflowCreateRequest.of(workflowName, dag);
37}
38
39// 创建一个复杂的工作流
40private WorkflowCreateRequest createComplexWorkflowRequest(String workflowName) {
41 // 创建开始节点(start stage)
42 StageParamModel startStageParam = StageParamModel.of("{}");
43 StageModel startStage = StageModel.of("start", startStageParam, StageType.START);
44 // 创建元信息提取节点(mediainfo stage)
45 StageParamModel mediaInfoStageParam = StageParamModel.of("{}");
46 StageModel mediaInfoStage = StageModel.of("mediaInfo", mediaInfoStageParam, StageType.MEDIAINFO);
47 // 创建黑边检测节点(blackborder stage)
48 StageParamModel blackBorderDetectStageParam = StageParamModel.of("{}");
49 StageModel blackBorderDetectStage = StageModel.of("blackBorderDetect", blackBorderDetectStageParam,
50 StageType.BLACK_BORDER_DETECT);
51 // 创建转码节点(transcoding stage)
52 StageParamModel transcodingStageParam = StageParamModel.of(
53 "{\"job\":{\"pipelineName\":\"test_transcoding\",\"source\":{},\"target\":{\"presetName\":"
54 + "\"bvwtest\",\"targetBucket\":\"videoworks-source\"}},\"needDetectBlackBoard\":true,"
55 + "\"adjustOrientation\":\"ALL\"}");
56 StageModel transcodingStage = StageModel.of("transcoding", transcodingStageParam,
57 StageType.TRANSCODING);
58 // 创建缩略图节点(thumbnail stage)
59 StageParamModel thumbnailStageParam = StageParamModel.of(
60 "{\"notificationName\":\"test\",\"job\":{\"pipelineName\":\"test_thumbnail\",\"target\":"
61 + "{\"targetBucket\":\"videoworks-source\",\"format\":\"jpg\",\"sizingPolicy\":\"keep\"},"
62 + "\"capture\":{\"mode\":\"auto\"}}}");
63 StageModel thumbnailStage = StageModel.of("thumbnail", thumbnailStageParam, StageType.THUMBNAIL);
64 // 创建发布节点(publish stage)
65 StageParamModel publishStageParam = StageParamModel.of("{}");
66 StageModel publishStage = StageModel.of("publish", publishStageParam, StageType.PUBLISH);
67
68 // 设置节点
69 Map<String, StageModel> stages = Maps.newHashMap();
70 stages.put(startStage.getName(), startStage);
71 stages.put(mediaInfoStage.getName(), mediaInfoStage);
72 stages.put(blackBorderDetectStage.getName(), blackBorderDetectStage);
73 stages.put(transcodingStage.getName(), transcodingStage);
74 stages.put(thumbnailStage.getName(), thumbnailStage);
75 stages.put(publishStage.getName(), publishStage);
76
77 // 设置节点依赖关系
78 Map<String, List<String>> dependencies = Maps.newHashMap();
79 dependencies.put(startStage.getName(), Lists.newArrayList(blackBorderDetectStage.getName(),
80 mediaInfoStage.getName(),
81 thumbnailStage.getName()));
82 dependencies.put(blackBorderDetectStage.getName(), Lists.newArrayList(transcodingStage.getName()));
83 dependencies.put(transcodingStage.getName(), Lists.newArrayList(publishStage.getName()));
84 dependencies.put(mediaInfoStage.getName(), Lists.newArrayList(publishStage.getName()));
85 dependencies.put(thumbnailStage.getName(), Lists.newArrayList(publishStage.getName()));
86 dependencies.put(publishStage.getName(), Lists.newArrayList());
87
88 // 构建WorkflowCreateRequest
89 DagModel dag = DagModel.of(stages, dependencies);
90 return WorkflowCreateRequest.of(workflowName, dag);
91}
- 创建工作流的stage时,每个stage的参数信息都是Json序列化的字符串,字符串的内容可以参考VideoWorks服务的控制台界面的媒资列表页面——媒资执行实例——任务结果信息的输入信息。如下图是一个转码节点的输入。
- 工作流的stage信息和stage依赖信息中的每个stage名称需要一一对应,否则创建工作流将失败。
删除工作流
使用如下代码可以删除指定名称的工作流。
Java
1private void deleteWorkflow(BvwClient client, String workflowName) {
2 client.deleteWorkflow(workflowName);
3}
只有没有被媒资处理使用的工作流可以删除,否则将报错400,"workflow is using, can not modify"。
更新工作流
使用如下代码可以更新工作流的stage信息。
Java
1private void updateWorkflow(BvwClient client) {
2 client.updateWorkflow(updateWorkflowRequest("update_workflow"));
3}
4
5// 创建更新的工作流请求
6private WorkflowUpdateRequest updateWorkflowRequest(String workflowName) {
7 // 创建开始节点(start stage)
8 StageParamModel startStageParam = StageParamModel.of("{}");
9 StageModel startStage = StageModel.of("start", startStageParam, StageType.START);
10 // 创建转码节点(transcoding stage)
11 StageParamModel transcodingStageParam = StageParamModel.of(
12 "{\"job\":{\"pipelineName\":\"test_transcoding\",\"source\":{},\"target\":{\"presetName\":"
13 + "\"bvwtest\",\"targetBucket\":\"videoworks-source\"}},\"needDetectBlackBoard\":true,"
14 + "\"adjustOrientation\":\"ALL\"}");
15 StageModel transcodingStage = StageModel.of("transcoding", transcodingStageParam,
16 StageType.TRANSCODING);
17 // 创建发布节点(publish stage)
18 StageParamModel publishStageParam = StageParamModel.of("{}");
19 StageModel publishStage = StageModel.of("publish", publishStageParam, StageType.PUBLISH);
20
21 // 设置节点
22 Map<String, StageModel> stages = Maps.newHashMap();
23 stages.put(startStage.getName(), startStage);
24 stages.put(transcodingStage.getName(), transcodingStage);
25 stages.put(publishStage.getName(), publishStage);
26
27 // 设置节点依赖关系
28 Map<String, List<String>> dependencies = Maps.newHashMap();
29 dependencies.put(startStage.getName(), Lists.newArrayList(transcodingStage.getName()));
30 dependencies.put(transcodingStage.getName(), Lists.newArrayList(publishStage.getName()));
31 dependencies.put(publishStage.getName(), Lists.<String>newArrayList());
32
33 // 构建WorkflowCreateRequest
34 DagModel dag = DagModel.of(stages, dependencies);
35 return WorkflowUpdateRequest.of(workflowName, dag);
36}
只有没有被媒资处理使用的工作流才允许更新,否则将报错400,"workflow is using, can not modify"。
查询工作流
使用如下代码可以查询一个工作流。
Java
1private void getWorkflow(BvwClient client, String workflowName) {
2 client.getWorkflow(workflowName);
3}
查询工作流列表
使用如下代码可以查询工作流列表。
Java
1private void listWorkflow(BvwClient client) {
2 int pageNo = 1;
3 int pageSize = 10;
4 String begin = "2019-06-30T16:00:00Z";
5 String end = "2019-07-30T16:00:00Z";
6 String workflowNameFuzzy = "test";
7 WorkflowListRequest request = WorkflowListRequest.of(pageNo, pageSize, WorkflowStatus.NORMAL,
8 workflowNameFuzzy, begin, end);
9 ListByPageResponse<WorkflowListResponse> response = client.listWorkflow(request);
10}
查询工作流列表的参数只有pageNo和pageSize是必选项,其他参数为可选项。
启用工作流
使用如下代码可以启用一个工作流。
Java
1private void enableWorkflow(BvwClient client, String workflowName) {
2 client.enableWorkflow(workflowName);
3}
禁用工作流
使用如下代码可以禁用一个工作流。
Java
1private void disableWorkflow(BvwClient client, String workflowName) {
2 client.disableWorkflow(workflowName);
3}