掘金 人工智能 10月28日 07:24
RocketMQ 新特性赋能异步化 Agent 事件驱动架构
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了如何基于 Apache RocketMQ 的新特性构建异步化 Multi-Agent 系统。演讲者阐述了 Agentic AI 时代的挑战,强调了 Agent 能力发现与任务闭环的重要性。文章详细介绍了 RocketMQ 在 Topic 智能化演进和 Lite-Topic 轻量级消费模式上的创新,以及如何利用这些特性实现 Agent 间的异步通信、上下文隔离、状态恢复与任务编排。通过实际案例,展示了 RocketMQ 如何支持 Multi-Agent 的任务调度,为构建可靠、可控的异步智能体协作系统提供了技术路径。

🚀 **Agentic AI 时代的需求与挑战**:随着大模型能力提升,AI 正从被动响应转向主动决策和自主执行。Multi-Agent 架构应运而生,但实现高效可靠的协同,关键在于 Agent 能力的发现和任务的闭环。传统消息中间件的“发完即忘”模式难以满足 Agent 间需要反馈的异步通信需求。

💡 **RocketMQ Topic 的智能化演进**:RocketMQ 将 Topic 从单纯的数据通道升级为业务意图与能力语义的载体。通过引入自然语言描述和结构化元数据,Topic 具备了“自我表达能力”,使得 Agent 能够基于语义理解进行动态路由和智能分工,解决了 Agent 间“调用谁”的问题。

✨ **Lite-Topic 实现轻量级异步反馈**:Lite-Topic 作为一种无需预创建 Topic 和订阅关系的新类型 Topic,支持粒度更细的资源隔离和动态临时的订阅关系管理。结合 InterestSet 和 ReadySet 的事件驱动模型,它实现了“Pull 模型 + Push 语义”,能够高效、低延迟地将 Sub Agent 的结果反馈给 Supervisor Agent,确保了任务闭环。

🤝 **基于 RocketMQ 构建异步 Multi-Agent 系统流程**:通过语义化 Topic 实现能力注册与发现,Supervisor Agent 基于大模型进行任务拆解与规划,并利用 Lite-Topic 创建专属回调通道进行轻量级异步任务分发与反馈。Supervisor Agent 订阅 Lite-Topic 聚合结果,驱动下一轮推理,形成闭环驱动的持续决策循环,最终实现从静态流程到自主协作的跃迁。

作者:周礼

本文整理自阿里云智能集团高级技术专家周礼在 2025 全球机器学习技术大会上的精彩演讲《Apache RocketMQ x AI:面向异步化 Agent 的事件驱动架构》,介绍了如何基于 Apache RocketMQ 新特性构建异步化 Multi-Agent 系统,深入探讨了 Agent 间的异步通信、上下文隔离、状态恢复与任务编排机制,并通过实际案例展示如何利用 RocketMQ 实现 Multi-Agent 的任务调度。

Multi-Agent 协同的核心:Agent 能力发现与任务闭环

随着大模型能力提升与推理成本下降,MCP、A2A 等协作协议的成熟,AI 迈入了 Agentic AI 的时代,AI 应用也从“被动响应”进入了“主动决策、自主执行”阶段。这一演进催生了 Multi-Agent 架构:任务由多个专业化 Agent 协同完成,不再依赖单一模型或固定流程,开发者得以在模型自主性与业务可控性之间实现平衡。

与传统应用的固定编排不同,Agentic AI 具备自主规划能力,可将目标拆解为动态步骤,但规划又依赖每一步的结果反馈,依据完整的结果链路,Supervisor 才能掌握进展、评估成效,决定下一步行动。因此,要实现真正高效、可靠的协同,仅靠大模型的推理能力远远不够,Agent 之间的协作有两个关键点:能力发现与任务闭环。

1、Agent 能力发现

Agent 的能力发现有两个主要功能:

1. 动态注册 Agent 的能力(如“我能做数据分析”、“我擅长文案生成”);

2. 支持 Supervisor Agent 在运行时查询并选择合适的 Sub Agent 执行任务。

若没有 Agent 能力发现的功能,Agent 协作就只能依赖硬编码,丧失了自主性与扩展性。这一机制可以类比传统微服务中的服务发现与地址查找,调用方依赖注册中心查询服务地址,查找的条件是编码的 ServiceName。Agent 能力的发现与之相似而又有不同,它是面向语义的能力和意图驱动匹配,而能力识别和匹配又是交给大模型处理的,这个过程是实现智能分工的前提。

2、任务协同

在大模型(LLM)驱动的多 Agent 系统中,多个智能体(Agent)通过协作、竞争或分工完成复杂任务。尤其在 Supervisor Agent 架构中,Supervisor 作为系统的“大脑”,通过高层次的协调与管理,将多个专业化 Agent 组织成一个有机整体,从而完成单个 Agent 无法胜任的复杂任务。在 Agent 能力的声明与发现的基础上,为了实现 Supervisor 与其他 Agent 间的高效协同,需要设计合理的通信机制。不同的通信模式适用于不同场景,在灵活性、可扩展性、控制力和性能之间各有取舍。

我们考察基于发布/订阅模式实现 Agent 间异步调用的场景,Sub Agent 接收任务并完成后,需要将结果反馈给 Supervisor,并附带上下文唯一标识(如 Task_ID),以便 Supervisor 异步接收反馈并驱动下一步决策。但要保证 Supervisor Agent 节点异步获取到上次任务的结果,需要在异步场景下实现反馈机制,常见方案如下:

    独占队列: 每个 Supervisor 实例绑定独立 Queue 或者 Topic 来接收下游回写的结果 —— 资源开销大,管理复杂,存在性能瓶颈;广播过滤: Supervisor 集群共享消费分组,所有实例接收全部消息并自行过滤 —— 产生大量无效流量,浪费资源且有稳定性风险;共享存储: 结果存储在数据库或者缓存 —— 更灵活可靠,但每次 Supervisor Agent 作为发起方,需要不停进行轮询以确定自己发起的 Task 是否已经有结果产生,增加了存储成本与交互成本,实际类似于上述的轮询式通信。

可以看到,基于发布/订阅模式实现通信时比较复杂,其核心原因是主流的分布式消息中间件主要面向的是静态编排的业务场景,采用“发完即忘”(Fire-and-Forget)模式,不关心下游的反馈,这使得通信链路难以完成闭环。

RocketMQ 面向 Agentic AI 的新特性

接下来,我们探讨如何实现 Agent 的异步通信机制和动态决策,RocketMQ 在传统模式的基础上进行了针对性设计,推出带语义的 Topic 和 Lite-Topic 的新特性:以 Topic 语义作为能力注册与发现的基础,解决调用谁的问题;以 Lite Topic 动态绑定任务并等待结果消息,解决调用后异步获取结果的问题。 两者结合,以更简洁的方式实现需要反馈的异步任务驱动模式。

1、从数据通道到语义载体:Topic 的智能化演进

在传统的消息系统中,Topic 仅作为数据传输的通道存在,它定义了“消息发往哪里”,但无法表达“为什么发”或“谁需要它”。然而,在 Multi-Agent 协同场景下,通信不再只是简单的数据搬运,而是意图驱动的智能协作过程。为此,我们重新定义了 Topic:它不仅是消息的主题命名和分类,更是业务意图与能力语义的载体。

通过将自然语言描述与结构化元数据引入 Topic 定义,每个 Topic 不再只是一个主题名称,而是一个具备“自我表达能力”的协作单元。例如,一个实现了 A2A 协议中 AppCard 标准的 Topic 格式如下:

这样的设计使得 Topic 具备了可读性、可发现性与可推理性。结合 Nameserver 的服务注册与发现机制,这些带有语义标签的 Topic 可被统一索引和查询。每一个 Agent 可以通过订阅某个能描述自身能力的 Topic 来实现绑定关系,具备注册和被发现的能力,上层 Agent(如 Supervisor Agent)可通过能力关键词(如“数据分析”“内容生成”)动态发现并使用合适的 Topic 来异步驱动下游的 Agent,在任务编排过程中,Supervisor Agent 能够像调用函数一样选择 Topic,实现基于语义理解的动态路由决策。

2、轻量级消费模式:Lite-Topic

Lite-Topic 是在 RocketMQ 百万队列基础上设计的一种新类型的 Topic,它无需预创建 Topic 和订阅关系,并且能自动管理生命周期,主要面向短期、小量消息传输、客户端订阅关系动态临时变化、订阅集合高度个性化的场景。这种轻量化消费模型天然就能支持粒度更细的资源隔离,从而支持异步场景下的结果反馈机制,保证 Sub Agent 回写的结果能让发起任务的 Supervisor Agent 获取到。

为了维护这种“千人千面”的订阅关系,我们提出一种去中心化 + 最终一致性的订阅关系管理方式:将订阅关系提前注册到 Broker,避免每次请求重复传输,以增量注册方式应对新的模型中 Lite Topic 订阅的频繁变化,以全量方式做到最终一致性。同时,在组织方式上不再以 Group 为维度,而是以 Client_ID 为维度管理的订阅关系,可以称之为每个客户端的兴趣集(InterestSet)。

不同客户端的订阅集合维护在服务侧:

在消息读取方面:不再使用传统 Pull / Pop 模型中客户端针对每个 Topic 的每个 Queue 发起读请求的模式,因为这会带来数千个并发请求,连接与线程开销会线性增长。读请求不需携带 Topic,而只带上自己的身份即可,这样保证即使订阅集合庞大,读请求依然是轻量的。因为每个 Lite-Topic 的消息量并不大(几条到几百条),发送流量也不高甚至较为离散,但是 Topic 数量多,所以需要一种新的分发机制,在仍然保证低延迟的同时,降低 1:1 读模式带来的开销。

我们引入一个事件驱动的消息分发方式,核心组件是 ReadySet(就绪集合),就绪事件集合是每个 Client_ID 的待读取 Topic 队列集合,维护客户端的 Ready 事件,即“哪些 Topic 有消息可读”,存放当前有消息可读的 Lite-Topic。在 RocketMQ 中能触发这个集合变更的事件如下:

事件源描述
新消息写入事件Producer 发送消息 → 匹配 InterestSet → 加入对应 client 的 ReadySet
ACK 事件消费者确认部分消息后,若队列仍有未读消息
OrderLock 释放事件顺序消费锁释放后,下一消息变为可读 → 触发加入
订阅上线事件客户端完成注册,若有消息 → 立即唤醒

这种直接访问 ReadySet → 仅处理活跃 topic 的读取方式,避免了每次读请求遍历客户端所有订阅集合进而轮询每个 Lite Topic 带来的无效读操作。换个角度看,这个模型是:Pull 模型 + Push 语义

从整体上看,通过引入 InterestSet + ReadySet 的事件驱动模型,InterestSet 维护客户端的订阅关系,并在事件触发后分发到对应的 ReadySet,将传统“盲目轮询”转化为“精准唤醒”,最终实现在大规模个性化订阅场景下的高效、低延迟消息分发。

基于 RocketMQ 构建异步 Multi-Agent 系统

首先,通过上述 Lite-Topic 的能力,我们可以在 Multi-Agent 中更简洁地异步获取 Sub Agent 的结果,Supervisor Agent 集群中,任何一个 Supervisor Agent 都可以通过动态订阅 Lite-Topic(以 Task_id 命名)来接收下游任务结果,实现整个任务的闭环。

其次,再结合语义化 Topic 的 Agent 能力注册与发现,我们构建了一套面向 Agentic AI 的高效异步协同架构。其核心业务流程如下:

    能力注册与发现: 每个 Sub Agent 在启动时创建与其业务职责对应的 Topic,并将协议规范、输入输出 Schema、自然语言描述等元数据注册至 NameServer。通过持续订阅该 Topic 接收任务,Sub Agent 不仅完成了通信接入,更实现了能力的主动暴露。语义驱动的任务编排: Supervisor Agent 基于用户目标构建 Prompt 上下文,并动态查询 NameServer 获取当前可用的 Topic 列表,将其作为“可调用函数库”注入大模型。LLM 由此可在真实、可观测的能力空间中进行任务拆解与路径规划,避免了“幻觉式决策”,提升了执行的可行性与可控性。轻量级异步任务分发与反馈: 在执行阶段,Supervisor 向目标 Topic 发送消息,同时为本次调用创建一个临时的 Lite-Topic 作为专属回调通道。该机制无需绑定具体实例,即可实现高并发下的结果路由,兼顾性能与灵活性。闭环驱动的持续决策: Supervisor 订阅相关 Lite-Topic,异步聚合各子任务的执行结果,重新注入上下文,驱动下一轮推理与编排。整个过程形成一个以反馈为核心、动态演进的决策循环,真正实现了从“静态流程”到“自主协作”的跃迁。

这套依托 RocketMQ 在发布/订阅模型上的创新扩展而实现的架构,在保证系统松耦合和高扩展性的同时,有效支持了 Multi-Agent 场景下任务编排、结果反馈和多轮决策的需求。基于 RocketMQ 的这一实践,为构建可靠、可控的异步智能体协作系统提供了一种可行的技术路径。

展望未来,Apache RocketMQ 将持续在 AI 领域进行技术迭代与创新。诚邀您点击此处参与问卷调研,反馈真实使用场景和痛点,帮助我们打造更符合 AI 时代需求的消息引擎。也欢迎钉钉搜索加入 RocketMQ for AI 用户交流群(群号:110085036316),与我们交流探讨~

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Apache RocketMQ Multi-Agent Agentic AI 事件驱动架构 异步通信 任务编排 大模型
相关文章