掘金 人工智能 09月04日
LangGraph Reducer函数详解
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

LangGraph的Reducer函数是一种强大的机制,用于在并行执行路径中合并来自多个节点的状态更新。本文通过分析03_reducer_functions.py示例,深入讲解了Reducer函数的概念、实现方式和在并行执行场景中的应用。Reducer函数主要解决并行执行时状态合并的问题,确保数据的完整性和一致性。示例展示了如何使用Reducer函数合并两个并行节点生成的列表数据,并计算总和。Reducer函数在并行数据处理、状态一致性、数据聚合和复杂工作流管理中特别有用。

🔹 Reducer函数是一种特殊的函数,用于合并两个状态值(通常来自不同节点的更新),在LangGraph中主要用于并行执行场景,确保状态合并的完整性和一致性。

🔸 示例中定义了list_reducer函数,该函数接收两个列表参数,如果新值为None则返回当前值,否则将两个列表合并后返回,实现了列表的合并操作。

🔶 通过Annotated类型标注和自定义的list_reducer函数,状态类型ReducerState中的numbers字段被设置为使用该Reducer函数合并更新,而不是简单地覆盖原有值。

🚀 在图结构中,从START节点同时连接到两个生成节点generate1和generate2,展示了并行执行的能力,两个生成节点都更新同一个numbers字段,需要Reducer函数来合并它们的更新。

🔄 使用InMemorySaver作为checkpointer支持状态的持久化和并行执行,确保来自不同并行路径的状态更新能够正确地合并,通过指定thread_id来标识当前执行线程。

前言

在复杂的工作流系统中,经常需要处理来自多个并行执行路径的状态更新。LangGraph提供了一种强大的机制——Reducer函数,来解决这个问题。Reducer函数允许我们合并来自不同节点的状态更新,确保数据的完整性和一致性。本文将通过分析03_reducer_functions.py示例,深入讲解Reducer函数的概念、实现方式和在并行执行场景中的应用。

Reducer函数基础概念

Reducer函数是一种特殊的函数,用于合并两个状态值(通常是来自不同节点的更新)。在LangGraph中,Reducer函数主要用于以下场景:

    并行执行:当多个节点并行执行并更新同一状态字段时状态合并:将来自不同来源的状态更新合并为一个统一的状态数据聚合:对分散的数据进行汇总和整合

Reducer函数的基本形式是接收两个参数(当前值和新值),并返回一个合并后的值。

示例代码

from typing_extensions import TypedDict, Annotatedfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint.memory import InMemorySaverprint("======= 示例3: 使用reducer函数合并状态 =======")# 定义reducer函数def list_reducer(a: list, b: list | None) -> list:    """合并两个列表的reducer函数"""    if b is None:        return a    return a + b# 定义状态类型,使用Annotated指定reducerclass ReducerState(TypedDict):    numbers: Annotated[list, list_reducer]    sum: int# 定义节点函数def generate_numbers1(state: ReducerState) -> dict:    """生成第一批数字"""    return {"numbers": [1, 2, 3]}def generate_numbers2(state: ReducerState) -> dict:    """生成第二批数字"""    return {"numbers": [4, 5, 6]}def calculate_sum(state: ReducerState) -> dict:    """计算数字总和"""    total = sum(state["numbers"])    return {"sum": total}# 创建StateGraphreducer_graph = StateGraph(ReducerState)# 添加节点reducer_graph.add_node("generate1", generate_numbers1)reducer_graph.add_node("generate2", generate_numbers2)reducer_graph.add_node("calculate_sum", calculate_sum)# 添加边 - 并行执行两个生成节点reducer_graph.add_edge(START, "generate1")reducer_graph.add_edge(START, "generate2")reducer_graph.add_edge("generate1", "calculate_sum")reducer_graph.add_edge("generate2", "calculate_sum")reducer_graph.add_edge("calculate_sum", END)# 编译图(使用checkpointer来支持并行执行)checkpointer = InMemorySaver()compiled_reducer_graph = reducer_graph.compile(checkpointer=checkpointer)# 执行图result = compiled_reducer_graph.invoke({"numbers": [], "sum": 0}, config={"configurable": {"thread_id": "1"}})print(f"输入: {{'numbers': [], 'sum': 0}}")print(f"输出: {result}")# 示例说明:# 1. 这个示例展示了如何使用reducer函数合并来自不同节点的状态更新# 2. 通过Annotated类型标注和自定义的list_reducer函数,实现了列表的合并操作# 3. 图中从START节点同时连接到两个生成节点,展示了并行执行的能力# 4. 使用checkpointer来支持状态的持久化和并行执行# 5. 最终calculate_sum节点接收到合并后的完整列表并计算总和

输出结果

======= 示例3: 使用reducer函数合并状态 =======输入: {'numbers': [], 'sum': 0}输出: {'numbers': [1, 2, 3, 4, 5, 6], 'sum': 21}

代码解析:Reducer函数与并行执行

1. 定义Reducer函数

def list_reducer(a: list, b: list | None) -> list:    """合并两个列表的reducer函数"""    if b is None:        return a    return a + b

这个list_reducer函数是一个简单但典型的reducer函数实现:

在实际应用中,reducer函数的逻辑可以根据业务需求变得更加复杂,例如合并字典、更新计数器等。

2. 定义带Reducer的状态类型

class ReducerState(TypedDict):    numbers: Annotated[list, list_reducer]    sum: int

这里使用Annotated类型标注来将reducer函数与状态字段关联起来:

通过这种方式,LangGraph知道当多个节点尝试同时更新numbers字段时,应该使用list_reducer函数来合并这些更新,而不是简单地覆盖。

3. 定义节点函数

def generate_numbers1(state: ReducerState) -> dict:    """生成第一批数字"""    return {"numbers": [1, 2, 3]}def generate_numbers2(state: ReducerState) -> dict:    """生成第二批数字"""    return {"numbers": [4, 5, 6]}def calculate_sum(state: ReducerState) -> dict:    """计算数字总和"""    total = sum(state["numbers"])    return {"sum": total}

这里定义了三个节点函数:

特别注意前两个函数都在更新同一个numbers字段,这就是为什么我们需要reducer函数来合并它们的更新。

4. 创建并行执行的图结构

# 创建StateGraphreducer_graph = StateGraph(ReducerState)# 添加节点reducer_graph.add_node("generate1", generate_numbers1)reducer_graph.add_node("generate2", generate_numbers2)reducer_graph.add_node("calculate_sum", calculate_sum)# 添加边 - 并行执行两个生成节点reducer_graph.add_edge(START, "generate1")reducer_graph.add_edge(START, "generate2")reducer_graph.add_edge("generate1", "calculate_sum")reducer_graph.add_edge("generate2", "calculate_sum")reducer_graph.add_edge("calculate_sum", END)

这部分代码创建了一个具有并行执行路径的图结构:

这种结构允许我们同时执行多个任务,然后在它们完成后聚合结果,非常适合处理可以并行的独立任务。

5. 配置Checkpointer支持并行执行

# 编译图(使用checkpointer来支持并行执行)checkpointer = InMemorySaver()compiled_reducer_graph = reducer_graph.compile(checkpointer=checkpointer)

为了支持并行执行和状态合并,我们需要使用一个checkpointer。在这个例子中,我们使用了InMemorySaver作为checkpointer,它将状态保存在内存中。在实际应用中,你也可以使用其他类型的checkpointer,如基于文件系统或数据库的checkpointer。

6. 执行图并指定线程ID

# 执行图result = compiled_reducer_graph.invoke({"numbers": [], "sum": 0}, config={"configurable": {"thread_id": "1"}})print(f"输入: {{'numbers': [], 'sum': 0}}")print(f"输出: {result}")

执行图时,我们需要提供一个初始状态和一个配置对象。配置对象中的thread_id非常重要,它用于标识当前执行线程,确保来自不同并行路径的状态更新能够正确地合并。

执行流程分析

让我们详细分析一下整个图的执行流程:

    初始化invoke()方法接收初始状态{"numbers": [], "sum": 0}和配置{"configurable": {"thread_id": "1"}}并行执行生成节点:从START节点同时执行generate1generate2节点
      generate1返回{"numbers": [1, 2, 3]}generate2返回{"numbers": [4, 5, 6]}
    合并状态更新:使用list_reducer函数合并两个numbers更新,得到[1, 2, 3, 4, 5, 6]执行计算节点:两个生成节点都完成后,执行calculate_sum节点,计算合并后列表的总和(21)结束:从calculate_sum节点连接到END节点,执行结束并返回最终状态

为什么使用Reducer函数?

Reducer函数在以下场景中特别有用:

    并行数据处理:当多个节点并行处理数据并需要合并结果时状态一致性:确保来自不同来源的状态更新不会相互覆盖数据聚合:将分散的数据点汇总成有意义的整体复杂工作流:在复杂的工作流中管理状态的演变

优化

虽然这个示例很好地展示了reducer函数的基础用法,但还有一些可以改进的地方:

    处理更复杂的数据结构:可以为更复杂的数据结构实现reducer函数
    def dict_reducer(a: dict, b: dict | None) -> dict:    """合并两个字典的reducer函数"""    if b is None:        return a    result = a.copy()    result.update(b)    return result    # 在状态类型中使用class ComplexState(TypedDict):    data: Annotated[dict, dict_reducer]

Reducer函数的实际应用场景

Reducer函数在实际应用中有广泛的用途:

    数据收集:从多个来源收集数据并合并多步骤处理:在多步骤处理流程中累积中间结果并行计算:执行并行计算并合并结果分布式系统:在分布式系统中协调状态更新历史记录:维护状态的历史记录或变更日志

总结

通过本文的学习,我们了解了LangGraph中reducer函数的概念、实现方式和在并行执行场景中的应用。Reducer函数是处理复杂工作流中状态合并的强大工具,它允许我们安全地合并来自不同节点的状态更新,确保数据的完整性和一致性。

这个示例展示了如何使用reducer函数合并两个并行执行节点生成的列表数据,但reducer函数的应用远不止于此。在实际应用中,你可以根据业务需求实现各种复杂的reducer逻辑,构建更加灵活和强大的工作流系统。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

LangGraph Reducer函数 并行执行 状态合并 数据聚合
相关文章