Skip to content

LangGraph 并行节点执行详细解读

📚 概述

本文档详细解读 LangGraph 中的并行节点执行(Parallelization)机制。这是构建高效多智能体系统的核心技术之一,能让多个任务同时执行,大幅提升性能。

🎯 核心概念

什么是并行执行?

在 LangGraph 中,并行执行指的是让多个节点在同一时间步(step)内同时运行,而不是按顺序一个接一个执行。这种模式常被称为 Fan-out(扇出)和 Fan-in(扇入)

  • Fan-out(扇出):一个节点的输出分发到多个并行节点
  • Fan-in(扇入):多个并行节点的输出汇聚到一个节点

🔧 基础示例:线性执行 vs 并行执行

1. 线性执行图(Sequential)

首先看一个简单的线性执行示例:

python
from typing import Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    state: str

class ReturnNodeValue:
    def __init__(self, node_secret: str):
        self._value = node_secret

    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['state']}")
        return {"state": [self._value]}

# 构建图
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# 线性流程:START → a → b → c → d → END
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", "c")
builder.add_edge("c", "d")
builder.add_edge("d", END)

graph = builder.compile()

执行结果:

Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm B"]
Adding I'm D to ["I'm C"]

{'state': ["I'm D"]}

分析: 每个节点按顺序执行,后一个节点会覆盖前一个节点的状态。


2. 并行执行图(Parallel)- 遇到问题!

现在让我们尝试让 bc 并行执行:

python
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# 并行流程:a 扇出到 b 和 c,然后扇入到 d
builder.add_edge(START, "a")
builder.add_edge("a", "b")  # a → b
builder.add_edge("a", "c")  # a → c(并行)
builder.add_edge("b", "d")  # b → d
builder.add_edge("c", "d")  # c → d
builder.add_edge("d", END)

graph = builder.compile()

执行时会报错!

Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
An error occurred: At key 'state': Can receive only one value per step.
Use an Annotated key to handle multiple values.

为什么会报错?

因为在同一个时间步(step)内,bc 都试图更新 state 这个键。LangGraph 不知道如何合并这两个更新,所以抛出 InvalidUpdateError 错误。


🔑 解决方案:使用 Reducer

什么是 Reducer?

Reducer 是一个函数,用于定义如何合并多个并行更新。在 Python 中,我们使用 Annotated 类型提示来指定 reducer。

Python 知识点:Annotated 类型提示

Annotated 是 Python 3.9+ 引入的类型提示工具,允许我们为类型添加元数据:

python
from typing import Annotated
import operator

# 语法:Annotated[类型, 元数据]
state: Annotated[list, operator.add]
#      ^^^^^^^^  ^^^^  ^^^^^^^^^^^^^
#      关键字    类型   reducer 函数

使用 operator.add 作为 Reducer

operator.add 是 Python 内置的加法运算符函数。对于列表,它执行列表拼接

python
import operator

# operator.add 对列表的作用
result = operator.add([1, 2], [3, 4])  # 结果: [1, 2, 3, 4]

修复后的代码:

python
import operator
from typing import Annotated

class State(TypedDict):
    # 使用 operator.add 作为 reducer,让 state 支持追加
    state: Annotated[list, operator.add]

# 构建图(结构同上)
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)

graph = builder.compile()

执行结果:

Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm D to ["I'm A", "I'm B", "I'm C"]

{'state': ["I'm A", "I'm B", "I'm C", "I'm D"]}

✅ 成功! 现在 bc 的更新都被保留了,它们的结果被拼接到 state 列表中。


⏳ 等待并行节点完成

不同长度的并行路径

当并行路径的长度不同时会发生什么?

python
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))  # b 路径的额外节点
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")  # b → b2(更长的路径)
builder.add_edge(["b2", "c"], "d")  # 等待 b2 和 c 都完成
builder.add_edge("d", END)

graph = builder.compile()

执行结果:

Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]

关键点:

  • bb2c 都属于同一个执行步骤(step)
  • 虽然 c 先完成,但图会等待 b2 也完成后,才进入 d 节点
  • 这是 LangGraph 的自动同步机制

🎨 控制状态更新的顺序

问题:默认顺序不可控

在上面的例子中,虽然都是同一步,但状态更新的顺序是:["I'm A", "I'm B", "I'm C", "I'm B2"]

注意 "I'm C" 出现在 "I'm B2" 之前,即使 b2b 的后续节点。

原因: LangGraph 根据图的拓扑结构决定更新顺序,我们无法直接控制。

解决方案:自定义 Reducer

我们可以编写自定义 reducer 来排序状态更新:

python
def sorting_reducer(left, right):
    """合并并排序列表中的值"""
    # 确保 left 是列表
    if not isinstance(left, list):
        left = [left]

    # 确保 right 是列表
    if not isinstance(right, list):
        right = [right]

    # 合并并排序
    return sorted(left + right, reverse=False)

class State(TypedDict):
    # 使用自定义的 sorting_reducer
    state: Annotated[list, sorting_reducer]

执行结果:

Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm B2", "I'm C"]

{'state': ["I'm A", "I'm B", "I'm B2", "I'm C", "I'm D"]}

注意: 现在顺序变成了 ["I'm A", "I'm B", "I'm B2", "I'm C", "I'm D"],按字母顺序排列!

🔍 Reducer 函数详解

自定义 reducer 的工作原理:

python
def sorting_reducer(left, right):
    # left: 当前状态的值
    # right: 新节点返回的值

    # 1. 标准化为列表
    if not isinstance(left, list):
        left = [left]
    if not isinstance(right, list):
        right = [right]

    # 2. 合并并处理(这里是排序)
    return sorted(left + right, reverse=False)

其他 reducer 示例:

python
# 只保留最新的 N 个值
def keep_last_n(n=5):
    def reducer(left, right):
        combined = (left if isinstance(left, list) else [left]) + \
                   (right if isinstance(right, list) else [right])
        return combined[-n:]
    return reducer

# 去重
def unique_reducer(left, right):
    combined = (left if isinstance(left, list) else [left]) + \
               (right if isinstance(right, list) else [right])
    return list(dict.fromkeys(combined))  # 保持顺序的去重

🤖 实战案例:并行检索生成答案

场景

我们要构建一个问答系统,同时从两个来源获取信息:

  1. Wikipedia - 获取百科知识
  2. Web Search (Tavily) - 获取最新网络信息

然后让 LLM 基于这些信息生成答案。

State 定义

python
import operator
from typing import Annotated
from typing_extensions import TypedDict

class State(TypedDict):
    question: str                              # 用户问题
    answer: str                                # 生成的答案
    context: Annotated[list, operator.add]     # 检索到的上下文(支持并行追加)

关键点:

  • context 使用 operator.add 作为 reducer,允许两个检索节点并行写入

节点实现

1. Web 搜索节点

python
from langchain_community.tools import TavilySearchResults

def search_web(state):
    """从网络搜索获取文档"""

    # 使用 Tavily 搜索
    tavily_search = TavilySearchResults(max_results=3)
    search_docs = tavily_search.invoke(state['question'])

    # 格式化搜索结果
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'<Document href="{doc["url"]}">\n{doc["content"]}\n</Document>'
            for doc in search_docs
        ]
    )

    return {"context": [formatted_search_docs]}

知识点:

  • 返回 {"context": [formatted_search_docs]},注意 context 的值是一个列表
  • 这样 operator.add 可以将其拼接到现有的 context 中

2. Wikipedia 搜索节点

python
from langchain_community.document_loaders import WikipediaLoader

def search_wikipedia(state):
    """从 Wikipedia 获取文档"""

    # 搜索 Wikipedia
    search_docs = WikipediaLoader(
        query=state['question'],
        load_max_docs=2
    ).load()

    # 格式化搜索结果
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}">\n{doc.page_content}\n</Document>'
            for doc in search_docs
        ]
    )

    return {"context": [formatted_search_docs]}

3. 生成答案节点

python
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage

llm = ChatOpenAI(model="gpt-4o", temperature=0)

def generate_answer(state):
    """基于上下文生成答案"""

    # 获取状态
    context = state["context"]
    question = state["question"]

    # 构建提示词
    answer_template = """Answer the question {question} using this context: {context}"""
    answer_instructions = answer_template.format(
        question=question,
        context=context
    )

    # 调用 LLM 生成答案
    answer = llm.invoke([
        SystemMessage(content=answer_instructions),
        HumanMessage(content="Answer the question.")
    ])

    return {"answer": answer}

构建并行图

python
from langgraph.graph import StateGraph, START, END

builder = StateGraph(State)

# 添加节点
builder.add_node("search_web", search_web)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("generate_answer", generate_answer)

# 并行流程
builder.add_edge(START, "search_wikipedia")  # START → search_wikipedia
builder.add_edge(START, "search_web")        # START → search_web(并行)
builder.add_edge("search_wikipedia", "generate_answer")  # 汇聚
builder.add_edge("search_web", "generate_answer")        # 汇聚
builder.add_edge("generate_answer", END)

graph = builder.compile()

流程图:

          START
         /     \
        /       \
  search_web  search_wikipedia
        \       /
         \     /
    generate_answer
           |
          END

执行查询

python
result = graph.invoke({
    "question": "How were Nvidia's Q2 2024 earnings"
})

print(result['answer'].content)

输出示例:

Nvidia's Q2 2024 earnings were strong, showcasing record revenue and a robust
performance in its data center division. The company reported revenue of $30.0
billion, which was up 15% from the previous quarter and up 122% from a year ago.
GAAP earnings per diluted share were $0.67, up 12% from the previous quarter and
up 168% from a year ago, while non-GAAP earnings per diluted share were $0.68...

🎓 核心知识点总结

LangGraph 特有概念

  1. 并行执行模式

    • Fan-out:一个节点扇出到多个节点
    • Fan-in:多个节点汇聚到一个节点
  2. Reducer 机制

    • 必须使用 reducer 处理并行更新同一 state 键的情况
    • operator.add 用于列表拼接
    • 可以自定义 reducer 控制合并逻辑
  3. 执行步骤(Step)

    • 并行节点在同一个 step 中执行
    • 图会等待所有并行路径完成后再进入下一步
  4. 状态更新顺序

    • 默认顺序由 LangGraph 根据图拓扑决定
    • 可通过自定义 reducer 控制顺序

Python 特有知识点

  1. TypedDict

    python
    from typing_extensions import TypedDict
    
    class State(TypedDict):
        field1: str
        field2: int

    用于定义字典的类型结构

  2. Annotated 类型提示

    python
    from typing import Annotated
    
    # 语法:Annotated[类型, 元数据...]
    field: Annotated[list, operator.add]

    为类型添加额外的元数据

  3. operator.add

    python
    import operator
    
    # 对列表执行拼接
    operator.add([1, 2], [3, 4])  # [1, 2, 3, 4]
  4. 可调用类(Callable Class)

    python
    class ReturnNodeValue:
        def __init__(self, value):
            self._value = value
    
        def __call__(self, state):  # 实现 __call__ 方法
            return {"state": [self._value]}

    实现 __call__ 方法使对象可以像函数一样调用


💡 最佳实践

1. 何时使用并行执行?

适用场景:

  • 多个独立的数据源检索(如同时查询数据库、API、搜索引擎)
  • 多个独立的处理任务(如同时进行文本分类、实体提取、情感分析)
  • 需要多个 Agent 同时工作的场景

不适用场景:

  • 任务之间有依赖关系
  • 需要严格的执行顺序
  • 资源受限(如 API 调用限制)

2. Reducer 选择指南

场景Reducer说明
收集所有结果operator.add拼接所有更新
需要排序自定义 sorting_reducer合并后排序
去重自定义 unique_reducer只保留唯一值
只保留最新自定义 lambda left, right: right覆盖模式
保留最近 N 个自定义 keep_last_n滑动窗口

3. 状态设计建议

python
class State(TypedDict):
    # 输入字段(不会被并行更新)
    question: str

    # 并行收集的数据(使用 reducer)
    search_results: Annotated[list, operator.add]

    # 最终输出(单一节点更新)
    answer: str

🚀 进阶技巧

稳定排序(Stable Sorting)

对于需要精确控制顺序的场景,可以使用"临时字段 + 汇聚节点"模式:

python
class State(TypedDict):
    question: str
    temp_results: dict  # 临时存储并行结果
    final_results: list  # 排序后的最终结果

def collect_results(state):
    """汇聚节点:合并并排序临时结果"""
    results = state['temp_results']
    # 按自定义顺序排序
    ordered = [results['source1'], results['source2'], results['source3']]
    return {"final_results": ordered, "temp_results": {}}  # 清空临时字段

详见 官方文档


📖 扩展阅读


总结:并行执行是 LangGraph 的强大功能,通过合理使用 reducer 和图结构设计,可以构建高效的多任务、多智能体系统。掌握这个概念是进入高级 LangGraph 开发的关键一步!

基于 MIT 许可证发布。内容版权归作者所有。