LangGraph Map-Reduce 详细解读
📚 概述
本文档详细解读 LangGraph 中的 Map-Reduce 模式。这是一种经典的分布式计算模式,在 LangGraph 中用于高效的任务分解和并行处理。通过 Map-Reduce,我们可以将复杂任务拆分成多个子任务并行执行,然后聚合结果。
🎯 核心概念
什么是 Map-Reduce?
Map-Reduce 是一种编程模型,包含两个阶段:
Map(映射)阶段
- 将大任务分解为多个小的子任务
- 并行处理每个子任务
- 每个子任务独立执行,互不影响
Reduce(归约)阶段
- 收集所有子任务的结果
- 对结果进行聚合、汇总或筛选
- 产生最终输出
经典应用场景
- 文档处理:分段处理长文档,然后汇总摘要
- 数据分析:并行分析多个数据源,聚合统计结果
- 内容生成:生成多个候选内容,选择最佳结果
- 多源查询:并行查询多个 API,合并结果
🎭 实战案例:笑话生成系统
我们将构建一个智能笑话生成系统,演示 Map-Reduce 的完整流程:
需求:
- Map 阶段:根据一个主题(如"动物"),生成多个子主题的笑话
- Reduce 阶段:从所有生成的笑话中,选出最好的一个
系统架构图
用户输入主题 "animals"
↓
[generate_topics] 生成子主题: [mammals, reptiles, birds]
↓
(Send API 动态分发)
↓
┌────────┬────────┬────────┐
↓ ↓ ↓ ↓
[joke-1] [joke-2] [joke-3] (Map 阶段:并行生成)
↓ ↓ ↓
└────────┴────────┴────────┘
↓
[best_joke] (Reduce 阶段:选择最佳)
↓
返回结果
🔧 代码实现详解
1. 定义提示词和模型
from langchain_openai import ChatOpenAI
# 三个关键提示词
subjects_prompt = """Generate a list of 3 sub-topics that are all related to this overall topic: {topic}."""
joke_prompt = """Generate a joke about {subject}"""
best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one, starting 0 as the ID for the first joke. Jokes: \n\n {jokes}"""
# 初始化 LLM
model = ChatOpenAI(model="gpt-4o", temperature=0)
说明:
subjects_prompt
:将主题拆分为 3 个子主题joke_prompt
:为单个子主题生成笑话best_joke_prompt
:从多个笑话中选择最好的
2. 定义状态(State)
这是 Map-Reduce 的关键!我们需要两种状态:
全局状态(OverallState)
import operator
from typing import Annotated
from typing_extensions import TypedDict
from pydantic import BaseModel
class Subjects(BaseModel):
subjects: list[str]
class BestJoke(BaseModel):
id: int
class OverallState(TypedDict):
topic: str # 用户输入的主题
subjects: list # 生成的子主题列表
jokes: Annotated[list, operator.add] # 收集的笑话(支持并行追加)
best_selected_joke: str # 最终选出的最佳笑话
关键点:
jokes
使用operator.add
reducer,允许多个并行节点同时写入- 这与之前学习的 parallelization 知识一致
局部状态(JokeState)
class JokeState(TypedDict):
subject: str # 单个笑话生成节点只需要知道主题
class Joke(BaseModel):
joke: str
Python 知识点:Pydantic BaseModel
BaseModel
是 Pydantic 库的核心类,用于:
- 数据验证
- 类型检查
- 结构化输出
# 使用示例
class Joke(BaseModel):
joke: str
# LLM 会返回符合这个结构的数据
response = model.with_structured_output(Joke).invoke(prompt)
# response.joke 就是字符串类型的笑话内容
3. Map 阶段:生成子主题
def generate_topics(state: OverallState):
prompt = subjects_prompt.format(topic=state["topic"])
response = model.with_structured_output(Subjects).invoke(prompt)
return {"subjects": response.subjects}
功能: 将用户输入的主题(如 "animals")分解为 3 个子主题(如 ["mammals", "reptiles", "birds"])
4. 动态任务分发:Send API ⭐
这是 Map-Reduce 的核心魔法!
from langgraph.types import Send
def continue_to_jokes(state: OverallState):
# 为每个子主题创建一个 Send 任务
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
Send API 详解:
Send("generate_joke", {"subject": s})
# ^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^
# 目标节点名称 发送的状态数据
重要特性:
- 动态并行化:自动为列表中的每个元素创建并行任务
- 状态灵活性:可以发送任意状态,不需要与
OverallState
完全匹配 - 自动扩展:无论有 3 个还是 300 个子主题,都能自动并行处理
为什么强大?
传统方式需要预先知道有多少个并行任务,而 Send
可以根据运行时数据动态创建任务:
# 如果有 3 个主题,Send 会创建 3 个并行任务
subjects = ["mammals", "reptiles", "birds"]
# 相当于:
# - Send("generate_joke", {"subject": "mammals"})
# - Send("generate_joke", {"subject": "reptiles"})
# - Send("generate_joke", {"subject": "birds"})
# 如果有 10 个主题,自动创建 10 个并行任务!
5. Map 阶段:并行生成笑话
def generate_joke(state: JokeState):
prompt = joke_prompt.format(subject=state["subject"])
response = model.with_structured_output(Joke).invoke(prompt)
return {"jokes": [response.joke]}
关键细节:
- 输入:
JokeState
(只包含subject
) - 输出:
{"jokes": [response.joke]}
(注意是列表!) - 返回的数据会被写回
OverallState
的jokes
字段 - 由于
jokes
有operator.add
reducer,多个并行节点的输出会自动拼接
执行流程示意:
generate_joke(subject="mammals") → 返回 {"jokes": ["joke1"]}
generate_joke(subject="reptiles") → 返回 {"jokes": ["joke2"]}
generate_joke(subject="birds") → 返回 {"jokes": ["joke3"]}
最终 OverallState.jokes = ["joke1", "joke2", "joke3"]
6. Reduce 阶段:选择最佳笑话
def best_joke(state: OverallState):
# 将所有笑话合并为一个字符串
jokes = "\n\n".join(state["jokes"])
# 让 LLM 选择最好的笑话
prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
response = model.with_structured_output(BestJoke).invoke(prompt)
# 返回被选中的笑话
return {"best_selected_joke": state["jokes"][response.id]}
功能:
- 接收所有并行生成的笑话
- 使用 LLM 评估并选择最佳笑话
BestJoke
模型返回最佳笑话的索引(id)- 根据索引从
state["jokes"]
中取出最佳笑话
7. 构建图
from langgraph.graph import END, StateGraph, START
# 创建图
graph = StateGraph(OverallState)
# 添加节点
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)
# 添加边
graph.add_edge(START, "generate_topics")
# 条件边:使用 Send 动态分发任务
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)
# 编译
app = graph.compile()
LangGraph 知识点:条件边与 Send
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
# ^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^
# 源节点 条件函数 可能的目标节点
continue_to_jokes
返回Send
对象列表- LangGraph 会自动为每个
Send
创建一个到generate_joke
的并行路径 - 所有并行任务完成后,自动进入
best_joke
节点
8. 执行图
for s in app.stream({"topic": "animals"}):
print(s)
输出示例:
{'generate_topics': {'subjects': ['mammals', 'reptiles', 'birds']}}
{'generate_joke': {'jokes': ["Why don't mammals ever get lost? Because they always follow their 'instincts'!"]}}
{'generate_joke': {'jokes': ["Why don't alligators like fast food? Because they can't catch it!"]}}
{'generate_joke': {'jokes': ["Why do birds fly south for the winter? Because it's too far to walk!"]}}
{'best_joke': {'best_selected_joke': "Why don't alligators like fast food? Because they can't catch it!"}}
观察要点:
- 首先生成 3 个子主题
- 然后并行生成 3 个笑话(注意输出顺序可能不同)
- 最后选出最佳笑话
🎓 核心知识点总结
LangGraph 特有概念
1. Send API
作用: 动态创建并行任务
Send(node_name, state_dict)
特点:
- 运行时动态决定并行数量
- 可以发送自定义状态(不必是完整的图状态)
- 自动处理并行执行和结果聚合
2. Map-Reduce 模式
阶段 | 作用 | 节点数量 | 执行方式 |
---|---|---|---|
Map | 分解任务,生成结果 | 动态(根据数据) | 并行 |
Reduce | 聚合结果 | 1 个 | 串行(等待所有 Map 完成) |
3. 多状态设计
- OverallState:全局状态,贯穿整个图
- JokeState:局部状态,只用于特定节点
- 通过
Send
可以在不同状态之间传递数据
4. 条件边 + Send
graph.add_conditional_edges(
"source_node", # 源节点
condition_function, # 返回 Send 列表的函数
["target_node"] # 目标节点(Send 指向的节点)
)
Python 特有知识点
1. Pydantic BaseModel
from pydantic import BaseModel
class Joke(BaseModel):
joke: str
# 用于 LLM 结构化输出
response = model.with_structured_output(Joke).invoke(prompt)
优势:
- 自动类型验证
- 清晰的数据结构
- 与 LangChain 无缝集成
2. TypedDict vs BaseModel
特性 | TypedDict | BaseModel |
---|---|---|
类型检查 | 静态(IDE 提示) | 运行时验证 |
验证 | 无 | 有 |
用途 | 状态定义 | 数据模型、API 输出 |
性能 | 更快(无验证) | 稍慢(有验证) |
# TypedDict - 用于 State
class OverallState(TypedDict):
topic: str
# BaseModel - 用于结构化输出
class Joke(BaseModel):
joke: str
3. 列表推导式 + Send
# 创建多个 Send 任务的优雅方式
[Send("generate_joke", {"subject": s}) for s in subjects]
# 等价于:
result = []
for s in subjects:
result.append(Send("generate_joke", {"subject": s}))
💡 最佳实践
1. 何时使用 Map-Reduce?
✅ 适用场景:
- 需要对多个数据项执行相同操作(如批量翻译、批量摘要)
- 任务可以自然分解为独立子任务(如分段处理文档)
- 需要从多个候选结果中筛选(如生成多个答案选最佳)
- 数据量大,需要并行加速(如分析多个数据源)
❌ 不适用场景:
- 子任务之间有依赖关系
- 无法自然分解的任务
- 单一数据源的简单查询
2. Send API 使用技巧
技巧 1:动态控制并行数量
def continue_to_jokes(state: OverallState):
# 可以根据条件过滤
subjects = [s for s in state["subjects"] if len(s) > 3]
return [Send("generate_joke", {"subject": s}) for s in subjects]
技巧 2:发送额外上下文
def continue_to_jokes(state: OverallState):
return [
Send("generate_joke", {
"subject": s,
"original_topic": state["topic"], # 传递额外信息
"style": "family-friendly"
})
for s in state["subjects"]
]
技巧 3:条件性发送
def continue_to_jokes(state: OverallState):
sends = []
for i, s in enumerate(state["subjects"]):
if i < 5: # 最多只处理前 5 个
sends.append(Send("generate_joke", {"subject": s}))
return sends
3. 状态设计原则
原则 1:最小化局部状态
# ✅ 好的设计 - 只包含必需字段
class JokeState(TypedDict):
subject: str
# ❌ 不好的设计 - 包含不需要的字段
class JokeState(TypedDict):
subject: str
topic: str # generate_joke 不需要这个
jokes: list # 也不需要这个
原则 2:使用 Reducer 聚合结果
# Map 节点的输出字段必须有 reducer
class OverallState(TypedDict):
jokes: Annotated[list, operator.add] # ✅ 正确
# jokes: list # ❌ 错误!并行更新会冲突
原则 3:清晰的状态流转
OverallState (完整状态)
↓
Send → JokeState (子集)
↓
返回 → OverallState.jokes (部分更新)
🚀 进阶技巧
1. 多层 Map-Reduce
可以嵌套多个 Map-Reduce 层次:
主题
↓ Map
子主题 (Level 1)
↓ Map
详细主题 (Level 2)
↓ Reduce
汇总子主题
↓ Reduce
最终结果
2. 带错误处理的 Map-Reduce
def generate_joke(state: JokeState):
try:
prompt = joke_prompt.format(subject=state["subject"])
response = model.with_structured_output(Joke).invoke(prompt)
return {"jokes": [response.joke]}
except Exception as e:
# 返回错误标记或默认值
return {"jokes": [f"Error generating joke for {state['subject']}"]}
3. 限制并行度
虽然 Send
会自动并行,但有时需要限制同时执行的任务数(如 API 速率限制):
# 方法 1:在条件函数中限制
def continue_to_jokes(state: OverallState):
# 只发送前 N 个
max_parallel = 5
subjects = state["subjects"][:max_parallel]
return [Send("generate_joke", {"subject": s}) for s in subjects]
# 方法 2:分批处理(需要更复杂的图设计)
📊 Map-Reduce vs 简单并行
特性 | Map-Reduce (Send) | 简单并行 (Fan-out) |
---|---|---|
并行数量 | 动态(运行时决定) | 静态(设计时固定) |
状态传递 | 灵活(可自定义) | 固定(使用图状态) |
适用场景 | 列表处理、批量任务 | 固定数量的并行路径 |
复杂度 | 中等 | 低 |
扩展性 | 高 | 低 |
示例对比:
# 简单并行 - 固定 3 个路径
builder.add_edge("start", "task1")
builder.add_edge("start", "task2")
builder.add_edge("start", "task3")
# Map-Reduce - 动态 N 个任务
def send_tasks(state):
return [Send("task", {"data": d}) for d in state["data_list"]]
builder.add_conditional_edges("start", send_tasks, ["task"])
🎯 实际应用案例
案例 1:文档摘要
# Map: 为每个段落生成摘要
def summarize_chunk(state: ChunkState):
summary = llm.invoke(f"Summarize: {state['chunk']}")
return {"summaries": [summary]}
# Reduce: 合并所有段落摘要
def combine_summaries(state: OverallState):
all_summaries = "\n".join(state["summaries"])
final = llm.invoke(f"Create final summary: {all_summaries}")
return {"final_summary": final}
案例 2:多语言翻译
# Map: 翻译到多种语言
def send_to_translate(state):
languages = ["es", "fr", "de", "zh"]
return [Send("translate", {"lang": lang, "text": state["text"]})
for lang in languages]
# Reduce: 收集所有翻译
def collect_translations(state):
return {"all_translations": state["translations"]}
案例 3:多角度分析
# Map: 从不同角度分析文本
def send_to_analyze(state):
perspectives = ["technical", "business", "user", "security"]
return [Send("analyze", {"perspective": p, "text": state["text"]})
for p in perspectives]
# Reduce: 综合所有分析
def synthesize(state):
combined = llm.invoke(f"Synthesize these analyses: {state['analyses']}")
return {"final_analysis": combined}
📖 扩展阅读
🔍 常见问题
Q1: Send 和普通边有什么区别?
普通边: 静态路由,设计时确定
graph.add_edge("A", "B") # A 总是流向 B
Send: 动态路由,运行时确定
# 根据数据动态创建多个并行任务
return [Send("B", data) for data in dynamic_data]
Q2: 为什么 generate_joke 返回 {"jokes": [joke]}
而不是 {"jokes": joke}
?
因为 OverallState.jokes
使用了 operator.add
reducer,它期望操作列表:
# 正确 ✅
operator.add(["joke1"], ["joke2"]) # → ["joke1", "joke2"]
# 错误 ❌
operator.add("joke1", "joke2") # → "joke1joke2" (字符串拼接)
Q3: 可以在 Send 中发送完全不同的状态吗?
可以!Send
发送的状态不需要与 OverallState
匹配:
# OverallState 有 topic, subjects, jokes
# 但 Send 可以发送任意结构
Send("generate_joke", {"subject": "cats", "style": "silly"})
只要目标节点(generate_joke
)能处理这个状态即可。
总结:Map-Reduce 是 LangGraph 中处理批量、并行任务的强大模式。通过 Send
API,我们可以动态创建任意数量的并行任务,然后用 reducer 优雅地聚合结果。这是构建可扩展、高性能 AI 应用的关键技术!