Filebeat接入Kafka专享版
更新时间:2024-01-03
Filebeat
Filebeat是用于转发和集中日志数据的轻量级传送工具。Filebeat监视您指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或 Logstash进行索引。
Filebeat的工作方式如下:启动Filebeat时,它将启动一个或多个输入,这些输入将在为日志数据指定的位置中查找。对于Filebeat所找到的每个日志,Filebeat都会启动收集器。每个收集器都读取单个日志以获取新内容,并将新日志数据发送到libbeat,libbeat将聚集事件,并将聚集的数据发送到为Filebeat配置的输出。
工作流程图如下:
前提条件
- 下载并安装Filebeat(参见Download Filebeat)
- 下载并安装1.8或以上版本 JDK
- 已创建消息服务 for kafka集群
Filebeat接入
步骤一:获取接入点
具体请参考查看集群接入点
SSL/SASL方式下载证书参考:如何下载证书?。
步骤二:创建/获取主题
步骤三:Filebeat发送消息
准备配置文件
进入 Filebeat 的安装目录,创建配置监控文件 filebeat.yml,参数详情可参考Filebeat官网。
YAML
1filebeat.inputs:
2
3- input_type: log
4
5# 此处为监听文件路径
6 paths:
7 - /var/log/*.log
8#------------------------------- Kafka output ----------------------------------
9output.kafka:
10 # 是否启用
11 enabled: true
12
13 # The list of Kafka broker addresses from which to fetch the cluster metadata.
14 # The cluster metadata contain the actual Kafka brokers events are published
15 # to
16 # 获取集群元数据的 Kafka 代理地址列表(接入点)
17 hosts: ["x.x.x.x:9095","x.x.x.x:9095","x.x.x.x:9095"]
18
19 # 用于生成事件的 Kafka 主题, 可以使用任何事件 field 的格式字符串, 若要从文档类型设置主题,请使用 `%{[type]}`.
20 topic: test
21
22 # 设置sasl协议字段
23 sasl.mechanism: "SCRAM-SHA-512"
24
25 # 设置 SSL协议字段
26 #SSL/SASL_SSL方式请下载证书文件: kafka-key.zip
27 #ssl.certificate_authorities:[/***/client_ssl.properties]
28 partition.random:
29 # If enabled, events will only be published to partitions with reachable leaders. Default is false.
30 # reachable_only 设置为true,则事件将仅发布到可用的分区
31 # 必须是 random, round_robin, hash 三种的一种
32 # 默认为 false
33 reachable_only: false
34
35 # Authentication details. Password is required if username is set.
36 # 身份验证的细节。如果设置了用户名,则需要密码。
37 # 没有设置可保持空值
38 username: ''
39 password: ''
40
41 # Sets the output compression codec. Must be one of none, snappy and gzip. The default is gzip.
42 # 设置输出压缩编解码器
43 # 必须是 none, snappy 和 gzip 中的一个
44 # 默认是 gzip
45 compression: none
46
47 # Set the compression level. Currently only gzip provides a compression level between 0 and 9. The default value is chosen by the compression algorithm.
48 # 设置压缩级别
49 # 目前只有 gzip 提供 0 到 9 之间的压缩级别
50 # 默认值由压缩算法选择
51 compression_level: 4
52
53 # The maximum permitted size of JSON-encoded messages. Bigger messages will be dropped. The default value is 1000000 (bytes). This value should be equal to r less than the broker's message.max.bytes.
54 # json编码消息的最大允许大小, 更大的消息将被删除。默认值是 1000000 ( 字节 ) 。这个值应该等于 r,小于 broker 的 message.max.bytes
55 max_message_bytes: 1000000
56 # The ACK reliability level required from broker. 0=no response, 1=wait for local commit, -1=wait for all replicas to commit. The default is 1. Note: If set to 0, no ACKs are returned by Kafka. Messages might be lost silently on error.
57 # 代理要求的ACK可靠性级别
58 # 0=无响应,1=等待本地提交,-1=等待所有副本提交
59 # 默认值是1
60 # 注意:如果设置为0,Kafka不会返回任何ack。出错时,消息可能会悄无声息地丢失。
61 required_acks: 1
62 # The configurable ClientID used for logging, debugging, and auditing purposes. The default is "beats".
63 # 可配置的ClientID,用于日志记录、调试和审计。默认是“beats”。
64 client_id: beats
Filebeat 发送消息
在filebeat安装路径执行如下命令
Shell
1./filebeat -e -c filebeat.yml
步骤四:Filebeat消费消息
准备配置文件
YAML
1filebeat.inputs:
2- type: kafka
3 hosts:
4 - ip:端口
5 - ip:端口
6 - ip:端口
7
8 # 身份验证的细节。如果设置了用户名,则需要密码。
9 # 没有设置可保持空值
10 username: ""
11 password: ""
12 #Topic的名称。
13 topics: ["filebeat_test"]
14 #Group的名称。
15 group_id: "filebeat_group"
16 #证书所在位置
17 #SSL/SASL_SSL方式请下载证书文件: kafka-key.zip
18 #ssl.certificate_authorities:[/***/client_ssl.properties]
19
20 ssl.verification_mode: none
21
22output.console:
23 pretty: true
Filebeat消费消息
在filebeat安装路径执行如下命令
Shell
1./filebeat -c ./input.yml
步骤五:查看集群监控
查看消息是否发送成功或消费成功有两种方式:
- 在服务器端查看jar包运行日志。
- 在专享版消息服务 for Kafka控制台查看集群监控,获取集群生产、消息情况。
推荐使用第二种方式,下面介绍如何查看集群监控。
(1)在专享版消息服务 for Kafka的控制台页面找到需要连接的集群,点击集群名称进入集群详情页面。
(2)页面跳转后,进入左侧边中的集群详情页面。
(3)点击左侧边栏中的集群监控,进入集群监控页面。
(4)通过查看集群监控页面,提供的不同纬度的监控信息(集群监控、节点监控、主题监控、消费组监控),即可获知集群的生产和消费情况。
集群监控的具体使用请参考:集群监控