Flink 使用指南
更新时间:2024-08-15
Flink
flink-bos-hadoop 是百度云对象存储系统 BOS 针对 Flink 的文件系统实现,并且支持了 recoverwriter 接口,Flink 可以基于该文件系统实现读写 BOS 上的数据以及作为流应用的状态后端。
安装
1.Flink 环境准备
以 1.15.0 版本为例。
Bash
1# 下载到一个路径
2wget https://archive.apache.org/dist/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
3
4#解压缩
5tar zxvf flink-1.15.0-bin-scala_2.12.tgz
2.添加依赖 jar 包和配置
在启动 Flink 之前,下载 flink-bos-hadoop 插件。
Bash
1# flink-bos-hadoop插件导入flink下
2mkdir ./plugins/bos-fs-hadoop
3cp flink-bos-hadoop-1.15.0-0.1.0.jar ./plugins/bos-fs-hadoop/
4
5# 访问BOS的一些必要配置
6vim ./conf/flink-conf.yaml
7...
8cat ./conf/flink-conf.yaml
9fs.bos.impl: org.apache.hadoop.fs.bos.BaiduBosFileSystem
10fs.AbstractFileSystem.bos.impl: org.apache.hadoop.fs.bos.BOS
11fs.bos.access.key: {your ak}
12fs.bos.secret.access.key: {your sk}
13fs.bos.endpoint: bj.bcebos.com {your bucket endpoint}
使用
启动
Bash
1./bin/start-cluster.sh
提交作业
Bash
1# 通过以下格式指定路径,BOS 对象可类似于普通文件使用:bos://<your-bucket>/{object-name}
2./bin/flink run examples/streaming/WordCount.jar --input "bos://my_bucket/students.txt" --output "bos://my_bucket/out"
查看运行结果
Bash
1# 查看wordcount统计结果
2$ hadoop fs -ls bos://my_bucket/out
3Found 1 items
4drwxrwxrwx - 0 1970-01-01 08:00 bos://my_bucket/out/2023-08-10--15
5
6$ hadoop fs -ls bos://my_bucket/out/2023-08-10--15/
7Found 1 items
8-rw-rw-rw- 1 1792 2023-08-10 15:52 bos://my_bucket/out/2023-08-10--15/part-3053774f-2d8e-40c5-aa3c-01402ce4b6b4-0
9
10$ hadoop fs -cat bos://my_bucket/out/2023-08-10--15/part-3053774f-2d8e-40c5-aa3c-01402ce4b6b4-0
11(name,1)
12(studentname,1)
13(age,1)
14...