LangGraph 并行节点执行详细解读
📚 概述
本文档详细解读 LangGraph 中的并行节点执行(Parallelization)机制。这是构建高效多智能体系统的核心技术之一,能让多个任务同时执行,大幅提升性能。
🎯 核心概念
什么是并行执行?
在 LangGraph 中,并行执行指的是让多个节点在同一时间步(step)内同时运行,而不是按顺序一个接一个执行。这种模式常被称为 Fan-out(扇出)和 Fan-in(扇入):
- Fan-out(扇出):一个节点的输出分发到多个并行节点
- Fan-in(扇入):多个并行节点的输出汇聚到一个节点
🔧 基础示例:线性执行 vs 并行执行
1. 线性执行图(Sequential)
首先看一个简单的线性执行示例:
from typing import Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
state: str
class ReturnNodeValue:
def __init__(self, node_secret: str):
self._value = node_secret
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['state']}")
return {"state": [self._value]}
# 构建图
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# 线性流程:START → a → b → c → d → END
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", "c")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
执行结果:
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm B"]
Adding I'm D to ["I'm C"]
{'state': ["I'm D"]}
分析: 每个节点按顺序执行,后一个节点会覆盖前一个节点的状态。
2. 并行执行图(Parallel)- 遇到问题!
现在让我们尝试让 b
和 c
并行执行:
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# 并行流程:a 扇出到 b 和 c,然后扇入到 d
builder.add_edge(START, "a")
builder.add_edge("a", "b") # a → b
builder.add_edge("a", "c") # a → c(并行)
builder.add_edge("b", "d") # b → d
builder.add_edge("c", "d") # c → d
builder.add_edge("d", END)
graph = builder.compile()
执行时会报错!
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
An error occurred: At key 'state': Can receive only one value per step.
Use an Annotated key to handle multiple values.
为什么会报错?
因为在同一个时间步(step)内,b
和 c
都试图更新 state
这个键。LangGraph 不知道如何合并这两个更新,所以抛出 InvalidUpdateError
错误。
🔑 解决方案:使用 Reducer
什么是 Reducer?
Reducer 是一个函数,用于定义如何合并多个并行更新。在 Python 中,我们使用 Annotated
类型提示来指定 reducer。
Python 知识点:Annotated
类型提示
Annotated
是 Python 3.9+ 引入的类型提示工具,允许我们为类型添加元数据:
from typing import Annotated
import operator
# 语法:Annotated[类型, 元数据]
state: Annotated[list, operator.add]
# ^^^^^^^^ ^^^^ ^^^^^^^^^^^^^
# 关键字 类型 reducer 函数
使用 operator.add
作为 Reducer
operator.add
是 Python 内置的加法运算符函数。对于列表,它执行列表拼接:
import operator
# operator.add 对列表的作用
result = operator.add([1, 2], [3, 4]) # 结果: [1, 2, 3, 4]
修复后的代码:
import operator
from typing import Annotated
class State(TypedDict):
# 使用 operator.add 作为 reducer,让 state 支持追加
state: Annotated[list, operator.add]
# 构建图(结构同上)
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
执行结果:
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm D to ["I'm A", "I'm B", "I'm C"]
{'state': ["I'm A", "I'm B", "I'm C", "I'm D"]}
✅ 成功! 现在 b
和 c
的更新都被保留了,它们的结果被拼接到 state
列表中。
⏳ 等待并行节点完成
不同长度的并行路径
当并行路径的长度不同时会发生什么?
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2")) # b 路径的额外节点
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2") # b → b2(更长的路径)
builder.add_edge(["b2", "c"], "d") # 等待 b2 和 c 都完成
builder.add_edge("d", END)
graph = builder.compile()
执行结果:
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]
关键点:
b
、b2
和c
都属于同一个执行步骤(step)- 虽然
c
先完成,但图会等待b2
也完成后,才进入d
节点 - 这是 LangGraph 的自动同步机制
🎨 控制状态更新的顺序
问题:默认顺序不可控
在上面的例子中,虽然都是同一步,但状态更新的顺序是:["I'm A", "I'm B", "I'm C", "I'm B2"]
注意 "I'm C"
出现在 "I'm B2"
之前,即使 b2
是 b
的后续节点。
原因: LangGraph 根据图的拓扑结构决定更新顺序,我们无法直接控制。
解决方案:自定义 Reducer
我们可以编写自定义 reducer 来排序状态更新:
def sorting_reducer(left, right):
"""合并并排序列表中的值"""
# 确保 left 是列表
if not isinstance(left, list):
left = [left]
# 确保 right 是列表
if not isinstance(right, list):
right = [right]
# 合并并排序
return sorted(left + right, reverse=False)
class State(TypedDict):
# 使用自定义的 sorting_reducer
state: Annotated[list, sorting_reducer]
执行结果:
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm B2", "I'm C"]
{'state': ["I'm A", "I'm B", "I'm B2", "I'm C", "I'm D"]}
注意: 现在顺序变成了 ["I'm A", "I'm B", "I'm B2", "I'm C", "I'm D"]
,按字母顺序排列!
🔍 Reducer 函数详解
自定义 reducer 的工作原理:
def sorting_reducer(left, right):
# left: 当前状态的值
# right: 新节点返回的值
# 1. 标准化为列表
if not isinstance(left, list):
left = [left]
if not isinstance(right, list):
right = [right]
# 2. 合并并处理(这里是排序)
return sorted(left + right, reverse=False)
其他 reducer 示例:
# 只保留最新的 N 个值
def keep_last_n(n=5):
def reducer(left, right):
combined = (left if isinstance(left, list) else [left]) + \
(right if isinstance(right, list) else [right])
return combined[-n:]
return reducer
# 去重
def unique_reducer(left, right):
combined = (left if isinstance(left, list) else [left]) + \
(right if isinstance(right, list) else [right])
return list(dict.fromkeys(combined)) # 保持顺序的去重
🤖 实战案例:并行检索生成答案
场景
我们要构建一个问答系统,同时从两个来源获取信息:
- Wikipedia - 获取百科知识
- Web Search (Tavily) - 获取最新网络信息
然后让 LLM 基于这些信息生成答案。
State 定义
import operator
from typing import Annotated
from typing_extensions import TypedDict
class State(TypedDict):
question: str # 用户问题
answer: str # 生成的答案
context: Annotated[list, operator.add] # 检索到的上下文(支持并行追加)
关键点:
context
使用operator.add
作为 reducer,允许两个检索节点并行写入
节点实现
1. Web 搜索节点
from langchain_community.tools import TavilySearchResults
def search_web(state):
"""从网络搜索获取文档"""
# 使用 Tavily 搜索
tavily_search = TavilySearchResults(max_results=3)
search_docs = tavily_search.invoke(state['question'])
# 格式化搜索结果
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document href="{doc["url"]}">\n{doc["content"]}\n</Document>'
for doc in search_docs
]
)
return {"context": [formatted_search_docs]}
知识点:
- 返回
{"context": [formatted_search_docs]}
,注意context
的值是一个列表 - 这样
operator.add
可以将其拼接到现有的 context 中
2. Wikipedia 搜索节点
from langchain_community.document_loaders import WikipediaLoader
def search_wikipedia(state):
"""从 Wikipedia 获取文档"""
# 搜索 Wikipedia
search_docs = WikipediaLoader(
query=state['question'],
load_max_docs=2
).load()
# 格式化搜索结果
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}">\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return {"context": [formatted_search_docs]}
3. 生成答案节点
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def generate_answer(state):
"""基于上下文生成答案"""
# 获取状态
context = state["context"]
question = state["question"]
# 构建提示词
answer_template = """Answer the question {question} using this context: {context}"""
answer_instructions = answer_template.format(
question=question,
context=context
)
# 调用 LLM 生成答案
answer = llm.invoke([
SystemMessage(content=answer_instructions),
HumanMessage(content="Answer the question.")
])
return {"answer": answer}
构建并行图
from langgraph.graph import StateGraph, START, END
builder = StateGraph(State)
# 添加节点
builder.add_node("search_web", search_web)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("generate_answer", generate_answer)
# 并行流程
builder.add_edge(START, "search_wikipedia") # START → search_wikipedia
builder.add_edge(START, "search_web") # START → search_web(并行)
builder.add_edge("search_wikipedia", "generate_answer") # 汇聚
builder.add_edge("search_web", "generate_answer") # 汇聚
builder.add_edge("generate_answer", END)
graph = builder.compile()
流程图:
START
/ \
/ \
search_web search_wikipedia
\ /
\ /
generate_answer
|
END
执行查询
result = graph.invoke({
"question": "How were Nvidia's Q2 2024 earnings"
})
print(result['answer'].content)
输出示例:
Nvidia's Q2 2024 earnings were strong, showcasing record revenue and a robust
performance in its data center division. The company reported revenue of $30.0
billion, which was up 15% from the previous quarter and up 122% from a year ago.
GAAP earnings per diluted share were $0.67, up 12% from the previous quarter and
up 168% from a year ago, while non-GAAP earnings per diluted share were $0.68...
🎓 核心知识点总结
LangGraph 特有概念
并行执行模式
- Fan-out:一个节点扇出到多个节点
- Fan-in:多个节点汇聚到一个节点
Reducer 机制
- 必须使用 reducer 处理并行更新同一 state 键的情况
operator.add
用于列表拼接- 可以自定义 reducer 控制合并逻辑
执行步骤(Step)
- 并行节点在同一个 step 中执行
- 图会等待所有并行路径完成后再进入下一步
状态更新顺序
- 默认顺序由 LangGraph 根据图拓扑决定
- 可通过自定义 reducer 控制顺序
Python 特有知识点
TypedDict
pythonfrom typing_extensions import TypedDict class State(TypedDict): field1: str field2: int
用于定义字典的类型结构
Annotated
类型提示pythonfrom typing import Annotated # 语法:Annotated[类型, 元数据...] field: Annotated[list, operator.add]
为类型添加额外的元数据
operator.add
pythonimport operator # 对列表执行拼接 operator.add([1, 2], [3, 4]) # [1, 2, 3, 4]
可调用类(Callable Class)
pythonclass ReturnNodeValue: def __init__(self, value): self._value = value def __call__(self, state): # 实现 __call__ 方法 return {"state": [self._value]}
实现
__call__
方法使对象可以像函数一样调用
💡 最佳实践
1. 何时使用并行执行?
✅ 适用场景:
- 多个独立的数据源检索(如同时查询数据库、API、搜索引擎)
- 多个独立的处理任务(如同时进行文本分类、实体提取、情感分析)
- 需要多个 Agent 同时工作的场景
❌ 不适用场景:
- 任务之间有依赖关系
- 需要严格的执行顺序
- 资源受限(如 API 调用限制)
2. Reducer 选择指南
场景 | Reducer | 说明 |
---|---|---|
收集所有结果 | operator.add | 拼接所有更新 |
需要排序 | 自定义 sorting_reducer | 合并后排序 |
去重 | 自定义 unique_reducer | 只保留唯一值 |
只保留最新 | 自定义 lambda left, right: right | 覆盖模式 |
保留最近 N 个 | 自定义 keep_last_n | 滑动窗口 |
3. 状态设计建议
class State(TypedDict):
# 输入字段(不会被并行更新)
question: str
# 并行收集的数据(使用 reducer)
search_results: Annotated[list, operator.add]
# 最终输出(单一节点更新)
answer: str
🚀 进阶技巧
稳定排序(Stable Sorting)
对于需要精确控制顺序的场景,可以使用"临时字段 + 汇聚节点"模式:
class State(TypedDict):
question: str
temp_results: dict # 临时存储并行结果
final_results: list # 排序后的最终结果
def collect_results(state):
"""汇聚节点:合并并排序临时结果"""
results = state['temp_results']
# 按自定义顺序排序
ordered = [results['source1'], results['source2'], results['source3']]
return {"final_results": ordered, "temp_results": {}} # 清空临时字段
详见 官方文档
📖 扩展阅读
- LangGraph 并行执行官方文档
- LangGraph State 和 Reducer 详解
- Python
typing
模块文档
总结:并行执行是 LangGraph 的强大功能,通过合理使用 reducer 和图结构设计,可以构建高效的多任务、多智能体系统。掌握这个概念是进入高级 LangGraph 开发的关键一步!