Rocketmq触发器使用示例
更新时间:2025-07-14
概述
CFC 支持配置 Rocketmq 触发器,配置后可以订阅指定的 Rocketmq topic 进行消息消费并配合函数进行消息处理。以下是创建 Rocketmq 触发器的一般步骤。
创建 Rocketmq 触发器
前提
- 创建 Rocketmq 集群,参考:https://cloud.baidu.com/doc/RocketMQ/s/rm4jbrmf4
- 创建主题(后续 Rocketmq 触发器需要订阅)
- 创建消费组(最好为 CFC 单独创建一个消费组,避免和业务消费组混用)
- 创建用户,如果在 Rocketmq 集群中设置了认证方式为『SASL/PLAIN身份认证』,必须要创建具体的用户并设置好用户的密码以及订阅权限,Rocketmq 触发器中需要使用该用户信息进行订阅处理。
创建函数
用户可以为新建的函数或已有函数配置 Rocketmq 触发器,创建函数的流程可以具体参考从头创建函数,这里不再赘述。
编写处理函数
- 登录管理控制台,选择“产品服务>云函数计算 CFC”,进入“函数列表”页面
- 在“函数列表”页面。选择具体的函数,进入函数详情页面。
在函数详情页中编写百度消息服务触发器对应的 handler,以对触发操作返回适当的响应,之后点击右下角保存按钮完成函数的修改操作。
Plain Text
1# -*- coding: utf-8 -*-
2import base64
3import json
4
5def handler(event, context):
6 for record in event['Records']:
7 # kafka value is base64 encoded so decode here
8 payload = base64.b64decode(record['Rocketmq']['Value'])
9 print("Decoded payload: " + payload.decode("utf-8"))
10 return 'Successfully processed {} records.'.format(len(event['Records']))
注:Rocketmq 触发器的event消息体
Plain Text
1{
2 "Records": [
3 {
4 "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110",
5 "Rocketmq": {
6 "Key": "", //base64后的key
7 "Value": "MA==", //base64后的value
8 "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
9 "Partition": 0,
10 "Offset": 43110,
11 "Timestamp": 1553151704.6529999
12 },
13 "EventName": "bce:rocketmq:record",
14 "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
15 "EventSource": "bce:rocketmq"
16 },
17 {
18 "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111",
19 "Rocketmq": {
20 "Key": "",
21 "Value": "MQ==",
22 "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
23 "Partition": 0,
24 "Offset": 43111,
25 "Timestamp": 1553151704.6529999
26 },
27 "EventName": "bce:rocketmq:record",
28 "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
29 "EventSource": "bce:rocketmq"
30 }
31 ]
32}
配置 Rocketmq 触发器
- 登录管理控制台,选择“产品服务> 函数计算 CFC”,进入“函数列表”页面
- 点击需要添加 Rocketmq 触发器的函数,进入函数详情页面。
- 点击左侧导航栏中的“触发器”,进入函数配置页面。
- 在函数配置页面中最下方点击“新增触发器”。
- 在弹出框中,点击下拉框“选择事件源进行添加”,选择 Rocketmq 触发器。(目前只支持北京和广州地域)
- 之后在弹出框中配置好选项,并点击确认,完成触发器的创建。
- 集群选择要监听的 Rocketmq 集群
- Topic 选择将要监听的 Rocketmq 服务 Topic
- 消费组选择监听消息使用的消费组
-
认证模式根据 Rocketmq 的认证模式进行
- 如果Rocketmq 没有配置认证模式,这里选择『无认证』即可
- 如果Rocketmq 设置了认证方式为『SASL/PLAIN身份认证』,则在触发器中也要选择『ACL 认证』,明天写Rocketmq 用户名和密码(需要个 Rocketmq 中的用户名和密码对应)
- 批处理大小:从 Topic 中一次读取的最大记录数,范围:1-32
- 起始位置:在 Topic 中开始读取的位置,最新记录对应 Rocketmq 的 OffsetNewest,最老记录对应 Rocketmq 的 OffsetOldest
- 启用触发器:是否直接启动触发器,建议先关闭触发器以便测试
特别注意:
- Rocketmq 触发器配置的消费组尽量独立,避免与业务消费组重复使用。
- Rocketmq 一个消费组只能订阅一个 topic,如果同时订阅多个 topic,会出现部分 topic 无法消费的情况,导致出错。所以 CFC 侧对此进行了限制,单一 Rocketmq 集群同一个消费组单地域保持唯一,只可以在一个函数中配置一次,建议使用不同的消费组来订阅不同的 topic。
创建完成后,可以在触发器中看到创建好的 Rocketmq 触发器信息。
测试触发器
模拟测试
- 点击测试按钮
- 事件模板选择 Rocketmq 触发器
- 点击执行进行测试,可以看到执行结果
真实测试
- 启动 Rocketmq 触发器
- 设置日志存储
- 向 Rocketmq 触发器中发送消息,在日志中可以看到执行结果