前言
在复杂的工作流系统中,经常需要处理来自多个并行执行路径的状态更新。LangGraph提供了一种强大的机制——Reducer函数,来解决这个问题。Reducer函数允许我们合并来自不同节点的状态更新,确保数据的完整性和一致性。本文将通过分析03_reducer_functions.py示例,深入讲解Reducer函数的概念、实现方式和在并行执行场景中的应用。
Reducer函数基础概念
Reducer函数是一种特殊的函数,用于合并两个状态值(通常是来自不同节点的更新)。在LangGraph中,Reducer函数主要用于以下场景:
- 并行执行:当多个节点并行执行并更新同一状态字段时状态合并:将来自不同来源的状态更新合并为一个统一的状态数据聚合:对分散的数据进行汇总和整合
Reducer函数的基本形式是接收两个参数(当前值和新值),并返回一个合并后的值。
示例代码
from typing_extensions import TypedDict, Annotatedfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint.memory import InMemorySaverprint("======= 示例3: 使用reducer函数合并状态 =======")# 定义reducer函数def list_reducer(a: list, b: list | None) -> list: """合并两个列表的reducer函数""" if b is None: return a return a + b# 定义状态类型,使用Annotated指定reducerclass ReducerState(TypedDict): numbers: Annotated[list, list_reducer] sum: int# 定义节点函数def generate_numbers1(state: ReducerState) -> dict: """生成第一批数字""" return {"numbers": [1, 2, 3]}def generate_numbers2(state: ReducerState) -> dict: """生成第二批数字""" return {"numbers": [4, 5, 6]}def calculate_sum(state: ReducerState) -> dict: """计算数字总和""" total = sum(state["numbers"]) return {"sum": total}# 创建StateGraphreducer_graph = StateGraph(ReducerState)# 添加节点reducer_graph.add_node("generate1", generate_numbers1)reducer_graph.add_node("generate2", generate_numbers2)reducer_graph.add_node("calculate_sum", calculate_sum)# 添加边 - 并行执行两个生成节点reducer_graph.add_edge(START, "generate1")reducer_graph.add_edge(START, "generate2")reducer_graph.add_edge("generate1", "calculate_sum")reducer_graph.add_edge("generate2", "calculate_sum")reducer_graph.add_edge("calculate_sum", END)# 编译图(使用checkpointer来支持并行执行)checkpointer = InMemorySaver()compiled_reducer_graph = reducer_graph.compile(checkpointer=checkpointer)# 执行图result = compiled_reducer_graph.invoke({"numbers": [], "sum": 0}, config={"configurable": {"thread_id": "1"}})print(f"输入: {{'numbers': [], 'sum': 0}}")print(f"输出: {result}")# 示例说明:# 1. 这个示例展示了如何使用reducer函数合并来自不同节点的状态更新# 2. 通过Annotated类型标注和自定义的list_reducer函数,实现了列表的合并操作# 3. 图中从START节点同时连接到两个生成节点,展示了并行执行的能力# 4. 使用checkpointer来支持状态的持久化和并行执行# 5. 最终calculate_sum节点接收到合并后的完整列表并计算总和输出结果
======= 示例3: 使用reducer函数合并状态 =======输入: {'numbers': [], 'sum': 0}输出: {'numbers': [1, 2, 3, 4, 5, 6], 'sum': 21}代码解析:Reducer函数与并行执行
1. 定义Reducer函数
def list_reducer(a: list, b: list | None) -> list: """合并两个列表的reducer函数""" if b is None: return a return a + b这个list_reducer函数是一个简单但典型的reducer函数实现:
- 它接收两个参数:当前值
a和新值b如果新值b为None,则返回当前值a否则,将两个列表合并并返回结果在实际应用中,reducer函数的逻辑可以根据业务需求变得更加复杂,例如合并字典、更新计数器等。
2. 定义带Reducer的状态类型
class ReducerState(TypedDict): numbers: Annotated[list, list_reducer] sum: int这里使用Annotated类型标注来将reducer函数与状态字段关联起来:
numbers字段是一个列表,使用list_reducer作为其reducer函数sum字段是一个整数,没有指定reducer函数(将使用默认的覆盖行为)通过这种方式,LangGraph知道当多个节点尝试同时更新numbers字段时,应该使用list_reducer函数来合并这些更新,而不是简单地覆盖。
3. 定义节点函数
def generate_numbers1(state: ReducerState) -> dict: """生成第一批数字""" return {"numbers": [1, 2, 3]}def generate_numbers2(state: ReducerState) -> dict: """生成第二批数字""" return {"numbers": [4, 5, 6]}def calculate_sum(state: ReducerState) -> dict: """计算数字总和""" total = sum(state["numbers"]) return {"sum": total}这里定义了三个节点函数:
generate_numbers1:生成第一批数字[1, 2, 3]并更新numbers字段generate_numbers2:生成第二批数字[4, 5, 6]并更新numbers字段calculate_sum:计算numbers列表中所有元素的总和并更新sum字段特别注意前两个函数都在更新同一个numbers字段,这就是为什么我们需要reducer函数来合并它们的更新。
4. 创建并行执行的图结构
# 创建StateGraphreducer_graph = StateGraph(ReducerState)# 添加节点reducer_graph.add_node("generate1", generate_numbers1)reducer_graph.add_node("generate2", generate_numbers2)reducer_graph.add_node("calculate_sum", calculate_sum)# 添加边 - 并行执行两个生成节点reducer_graph.add_edge(START, "generate1")reducer_graph.add_edge(START, "generate2")reducer_graph.add_edge("generate1", "calculate_sum")reducer_graph.add_edge("generate2", "calculate_sum")reducer_graph.add_edge("calculate_sum", END)这部分代码创建了一个具有并行执行路径的图结构:
- 从
START节点同时连接到generate1和generate2节点,这意味着这两个节点将并行执行然后,两个生成节点都连接到calculate_sum节点最后,calculate_sum节点连接到END节点这种结构允许我们同时执行多个任务,然后在它们完成后聚合结果,非常适合处理可以并行的独立任务。
5. 配置Checkpointer支持并行执行
# 编译图(使用checkpointer来支持并行执行)checkpointer = InMemorySaver()compiled_reducer_graph = reducer_graph.compile(checkpointer=checkpointer)为了支持并行执行和状态合并,我们需要使用一个checkpointer。在这个例子中,我们使用了InMemorySaver作为checkpointer,它将状态保存在内存中。在实际应用中,你也可以使用其他类型的checkpointer,如基于文件系统或数据库的checkpointer。
6. 执行图并指定线程ID
# 执行图result = compiled_reducer_graph.invoke({"numbers": [], "sum": 0}, config={"configurable": {"thread_id": "1"}})print(f"输入: {{'numbers': [], 'sum': 0}}")print(f"输出: {result}")执行图时,我们需要提供一个初始状态和一个配置对象。配置对象中的thread_id非常重要,它用于标识当前执行线程,确保来自不同并行路径的状态更新能够正确地合并。
执行流程分析
让我们详细分析一下整个图的执行流程:
- 初始化:
invoke()方法接收初始状态{"numbers": [], "sum": 0}和配置{"configurable": {"thread_id": "1"}}并行执行生成节点:从START节点同时执行generate1和generate2节点generate1返回{"numbers": [1, 2, 3]}generate2返回{"numbers": [4, 5, 6]}list_reducer函数合并两个numbers更新,得到[1, 2, 3, 4, 5, 6]执行计算节点:两个生成节点都完成后,执行calculate_sum节点,计算合并后列表的总和(21)结束:从calculate_sum节点连接到END节点,执行结束并返回最终状态为什么使用Reducer函数?
Reducer函数在以下场景中特别有用:
- 并行数据处理:当多个节点并行处理数据并需要合并结果时状态一致性:确保来自不同来源的状态更新不会相互覆盖数据聚合:将分散的数据点汇总成有意义的整体复杂工作流:在复杂的工作流中管理状态的演变
优化
虽然这个示例很好地展示了reducer函数的基础用法,但还有一些可以改进的地方:
- 处理更复杂的数据结构:可以为更复杂的数据结构实现reducer函数
def dict_reducer(a: dict, b: dict | None) -> dict: """合并两个字典的reducer函数""" if b is None: return a result = a.copy() result.update(b) return result # 在状态类型中使用class ComplexState(TypedDict): data: Annotated[dict, dict_reducer]Reducer函数的实际应用场景
Reducer函数在实际应用中有广泛的用途:
- 数据收集:从多个来源收集数据并合并多步骤处理:在多步骤处理流程中累积中间结果并行计算:执行并行计算并合并结果分布式系统:在分布式系统中协调状态更新历史记录:维护状态的历史记录或变更日志
总结
通过本文的学习,我们了解了LangGraph中reducer函数的概念、实现方式和在并行执行场景中的应用。Reducer函数是处理复杂工作流中状态合并的强大工具,它允许我们安全地合并来自不同节点的状态更新,确保数据的完整性和一致性。
这个示例展示了如何使用reducer函数合并两个并行执行节点生成的列表数据,但reducer函数的应用远不止于此。在实际应用中,你可以根据业务需求实现各种复杂的reducer逻辑,构建更加灵活和强大的工作流系统。
