Skip to content

LangGraph Map-Reduce 详细解读

📚 概述

本文档详细解读 LangGraph 中的 Map-Reduce 模式。这是一种经典的分布式计算模式,在 LangGraph 中用于高效的任务分解和并行处理。通过 Map-Reduce,我们可以将复杂任务拆分成多个子任务并行执行,然后聚合结果。

🎯 核心概念

什么是 Map-Reduce?

Map-Reduce 是一种编程模型,包含两个阶段:

  1. Map(映射)阶段

    • 将大任务分解为多个小的子任务
    • 并行处理每个子任务
    • 每个子任务独立执行,互不影响
  2. Reduce(归约)阶段

    • 收集所有子任务的结果
    • 对结果进行聚合、汇总或筛选
    • 产生最终输出

经典应用场景

  • 文档处理:分段处理长文档,然后汇总摘要
  • 数据分析:并行分析多个数据源,聚合统计结果
  • 内容生成:生成多个候选内容,选择最佳结果
  • 多源查询:并行查询多个 API,合并结果

🎭 实战案例:笑话生成系统

我们将构建一个智能笑话生成系统,演示 Map-Reduce 的完整流程:

需求:

  1. Map 阶段:根据一个主题(如"动物"),生成多个子主题的笑话
  2. Reduce 阶段:从所有生成的笑话中,选出最好的一个

系统架构图

用户输入主题 "animals"

   [generate_topics] 生成子主题: [mammals, reptiles, birds]

    (Send API 动态分发)

   ┌────────┬────────┬────────┐
   ↓        ↓        ↓        ↓
[joke-1] [joke-2] [joke-3]  (Map 阶段:并行生成)
   ↓        ↓        ↓
   └────────┴────────┴────────┘

        [best_joke]  (Reduce 阶段:选择最佳)

         返回结果

🔧 代码实现详解

1. 定义提示词和模型

python
from langchain_openai import ChatOpenAI

# 三个关键提示词
subjects_prompt = """Generate a list of 3 sub-topics that are all related to this overall topic: {topic}."""
joke_prompt = """Generate a joke about {subject}"""
best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one, starting 0 as the ID for the first joke. Jokes: \n\n  {jokes}"""

# 初始化 LLM
model = ChatOpenAI(model="gpt-4o", temperature=0)

说明:

  • subjects_prompt:将主题拆分为 3 个子主题
  • joke_prompt:为单个子主题生成笑话
  • best_joke_prompt:从多个笑话中选择最好的

2. 定义状态(State)

这是 Map-Reduce 的关键!我们需要两种状态:

全局状态(OverallState)

python
import operator
from typing import Annotated
from typing_extensions import TypedDict
from pydantic import BaseModel

class Subjects(BaseModel):
    subjects: list[str]

class BestJoke(BaseModel):
    id: int

class OverallState(TypedDict):
    topic: str                                  # 用户输入的主题
    subjects: list                               # 生成的子主题列表
    jokes: Annotated[list, operator.add]        # 收集的笑话(支持并行追加)
    best_selected_joke: str                     # 最终选出的最佳笑话

关键点:

  • jokes 使用 operator.add reducer,允许多个并行节点同时写入
  • 这与之前学习的 parallelization 知识一致

局部状态(JokeState)

python
class JokeState(TypedDict):
    subject: str  # 单个笑话生成节点只需要知道主题

class Joke(BaseModel):
    joke: str

Python 知识点:Pydantic BaseModel

BaseModel 是 Pydantic 库的核心类,用于:

  • 数据验证
  • 类型检查
  • 结构化输出
python
# 使用示例
class Joke(BaseModel):
    joke: str

# LLM 会返回符合这个结构的数据
response = model.with_structured_output(Joke).invoke(prompt)
# response.joke 就是字符串类型的笑话内容

3. Map 阶段:生成子主题

python
def generate_topics(state: OverallState):
    prompt = subjects_prompt.format(topic=state["topic"])
    response = model.with_structured_output(Subjects).invoke(prompt)
    return {"subjects": response.subjects}

功能: 将用户输入的主题(如 "animals")分解为 3 个子主题(如 ["mammals", "reptiles", "birds"])


4. 动态任务分发:Send API ⭐

这是 Map-Reduce 的核心魔法

python
from langgraph.types import Send

def continue_to_jokes(state: OverallState):
    # 为每个子主题创建一个 Send 任务
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

Send API 详解:

python
Send("generate_joke", {"subject": s})
#    ^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^
#    目标节点名称      发送的状态数据

重要特性:

  1. 动态并行化:自动为列表中的每个元素创建并行任务
  2. 状态灵活性:可以发送任意状态,不需要与 OverallState 完全匹配
  3. 自动扩展:无论有 3 个还是 300 个子主题,都能自动并行处理

为什么强大?

传统方式需要预先知道有多少个并行任务,而 Send 可以根据运行时数据动态创建任务:

python
# 如果有 3 个主题,Send 会创建 3 个并行任务
subjects = ["mammals", "reptiles", "birds"]
# 相当于:
# - Send("generate_joke", {"subject": "mammals"})
# - Send("generate_joke", {"subject": "reptiles"})
# - Send("generate_joke", {"subject": "birds"})

# 如果有 10 个主题,自动创建 10 个并行任务!

5. Map 阶段:并行生成笑话

python
def generate_joke(state: JokeState):
    prompt = joke_prompt.format(subject=state["subject"])
    response = model.with_structured_output(Joke).invoke(prompt)
    return {"jokes": [response.joke]}

关键细节:

  • 输入:JokeState(只包含 subject
  • 输出:{"jokes": [response.joke]}(注意是列表!)
  • 返回的数据会被写回 OverallStatejokes 字段
  • 由于 jokesoperator.add reducer,多个并行节点的输出会自动拼接

执行流程示意:

generate_joke(subject="mammals")  → 返回 {"jokes": ["joke1"]}
generate_joke(subject="reptiles") → 返回 {"jokes": ["joke2"]}
generate_joke(subject="birds")    → 返回 {"jokes": ["joke3"]}

最终 OverallState.jokes = ["joke1", "joke2", "joke3"]

6. Reduce 阶段:选择最佳笑话

python
def best_joke(state: OverallState):
    # 将所有笑话合并为一个字符串
    jokes = "\n\n".join(state["jokes"])

    # 让 LLM 选择最好的笑话
    prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
    response = model.with_structured_output(BestJoke).invoke(prompt)

    # 返回被选中的笑话
    return {"best_selected_joke": state["jokes"][response.id]}

功能:

  • 接收所有并行生成的笑话
  • 使用 LLM 评估并选择最佳笑话
  • BestJoke 模型返回最佳笑话的索引(id)
  • 根据索引从 state["jokes"] 中取出最佳笑话

7. 构建图

python
from langgraph.graph import END, StateGraph, START

# 创建图
graph = StateGraph(OverallState)

# 添加节点
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)

# 添加边
graph.add_edge(START, "generate_topics")

# 条件边:使用 Send 动态分发任务
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])

graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)

# 编译
app = graph.compile()

LangGraph 知识点:条件边与 Send

python
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
#                          ^^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^^
#                          源节点             条件函数             可能的目标节点
  • continue_to_jokes 返回 Send 对象列表
  • LangGraph 会自动为每个 Send 创建一个到 generate_joke 的并行路径
  • 所有并行任务完成后,自动进入 best_joke 节点

8. 执行图

python
for s in app.stream({"topic": "animals"}):
    print(s)

输出示例:

python
{'generate_topics': {'subjects': ['mammals', 'reptiles', 'birds']}}

{'generate_joke': {'jokes': ["Why don't mammals ever get lost? Because they always follow their 'instincts'!"]}}
{'generate_joke': {'jokes': ["Why don't alligators like fast food? Because they can't catch it!"]}}
{'generate_joke': {'jokes': ["Why do birds fly south for the winter? Because it's too far to walk!"]}}

{'best_joke': {'best_selected_joke': "Why don't alligators like fast food? Because they can't catch it!"}}

观察要点:

  1. 首先生成 3 个子主题
  2. 然后并行生成 3 个笑话(注意输出顺序可能不同)
  3. 最后选出最佳笑话

🎓 核心知识点总结

LangGraph 特有概念

1. Send API

作用: 动态创建并行任务

python
Send(node_name, state_dict)

特点:

  • 运行时动态决定并行数量
  • 可以发送自定义状态(不必是完整的图状态)
  • 自动处理并行执行和结果聚合

2. Map-Reduce 模式

阶段作用节点数量执行方式
Map分解任务,生成结果动态(根据数据)并行
Reduce聚合结果1 个串行(等待所有 Map 完成)

3. 多状态设计

  • OverallState:全局状态,贯穿整个图
  • JokeState:局部状态,只用于特定节点
  • 通过 Send 可以在不同状态之间传递数据

4. 条件边 + Send

python
graph.add_conditional_edges(
    "source_node",           # 源节点
    condition_function,      # 返回 Send 列表的函数
    ["target_node"]         # 目标节点(Send 指向的节点)
)

Python 特有知识点

1. Pydantic BaseModel

python
from pydantic import BaseModel

class Joke(BaseModel):
    joke: str

# 用于 LLM 结构化输出
response = model.with_structured_output(Joke).invoke(prompt)

优势:

  • 自动类型验证
  • 清晰的数据结构
  • 与 LangChain 无缝集成

2. TypedDict vs BaseModel

特性TypedDictBaseModel
类型检查静态(IDE 提示)运行时验证
验证
用途状态定义数据模型、API 输出
性能更快(无验证)稍慢(有验证)
python
# TypedDict - 用于 State
class OverallState(TypedDict):
    topic: str

# BaseModel - 用于结构化输出
class Joke(BaseModel):
    joke: str

3. 列表推导式 + Send

python
# 创建多个 Send 任务的优雅方式
[Send("generate_joke", {"subject": s}) for s in subjects]

# 等价于:
result = []
for s in subjects:
    result.append(Send("generate_joke", {"subject": s}))

💡 最佳实践

1. 何时使用 Map-Reduce?

适用场景:

  • 需要对多个数据项执行相同操作(如批量翻译、批量摘要)
  • 任务可以自然分解为独立子任务(如分段处理文档)
  • 需要从多个候选结果中筛选(如生成多个答案选最佳)
  • 数据量大,需要并行加速(如分析多个数据源)

不适用场景:

  • 子任务之间有依赖关系
  • 无法自然分解的任务
  • 单一数据源的简单查询

2. Send API 使用技巧

技巧 1:动态控制并行数量

python
def continue_to_jokes(state: OverallState):
    # 可以根据条件过滤
    subjects = [s for s in state["subjects"] if len(s) > 3]
    return [Send("generate_joke", {"subject": s}) for s in subjects]

技巧 2:发送额外上下文

python
def continue_to_jokes(state: OverallState):
    return [
        Send("generate_joke", {
            "subject": s,
            "original_topic": state["topic"],  # 传递额外信息
            "style": "family-friendly"
        })
        for s in state["subjects"]
    ]

技巧 3:条件性发送

python
def continue_to_jokes(state: OverallState):
    sends = []
    for i, s in enumerate(state["subjects"]):
        if i < 5:  # 最多只处理前 5 个
            sends.append(Send("generate_joke", {"subject": s}))
    return sends

3. 状态设计原则

原则 1:最小化局部状态

python
# ✅ 好的设计 - 只包含必需字段
class JokeState(TypedDict):
    subject: str

# ❌ 不好的设计 - 包含不需要的字段
class JokeState(TypedDict):
    subject: str
    topic: str  # generate_joke 不需要这个
    jokes: list  # 也不需要这个

原则 2:使用 Reducer 聚合结果

python
# Map 节点的输出字段必须有 reducer
class OverallState(TypedDict):
    jokes: Annotated[list, operator.add]  # ✅ 正确
    # jokes: list  # ❌ 错误!并行更新会冲突

原则 3:清晰的状态流转

python
OverallState (完整状态)

Send → JokeState (子集)

返回 → OverallState.jokes (部分更新)

🚀 进阶技巧

1. 多层 Map-Reduce

可以嵌套多个 Map-Reduce 层次:

python
主题
 ↓ Map
子主题 (Level 1)
 ↓ Map
详细主题 (Level 2)
 ↓ Reduce
汇总子主题
 ↓ Reduce
最终结果

2. 带错误处理的 Map-Reduce

python
def generate_joke(state: JokeState):
    try:
        prompt = joke_prompt.format(subject=state["subject"])
        response = model.with_structured_output(Joke).invoke(prompt)
        return {"jokes": [response.joke]}
    except Exception as e:
        # 返回错误标记或默认值
        return {"jokes": [f"Error generating joke for {state['subject']}"]}

3. 限制并行度

虽然 Send 会自动并行,但有时需要限制同时执行的任务数(如 API 速率限制):

python
# 方法 1:在条件函数中限制
def continue_to_jokes(state: OverallState):
    # 只发送前 N 个
    max_parallel = 5
    subjects = state["subjects"][:max_parallel]
    return [Send("generate_joke", {"subject": s}) for s in subjects]

# 方法 2:分批处理(需要更复杂的图设计)

📊 Map-Reduce vs 简单并行

特性Map-Reduce (Send)简单并行 (Fan-out)
并行数量动态(运行时决定)静态(设计时固定)
状态传递灵活(可自定义)固定(使用图状态)
适用场景列表处理、批量任务固定数量的并行路径
复杂度中等
扩展性

示例对比:

python
# 简单并行 - 固定 3 个路径
builder.add_edge("start", "task1")
builder.add_edge("start", "task2")
builder.add_edge("start", "task3")

# Map-Reduce - 动态 N 个任务
def send_tasks(state):
    return [Send("task", {"data": d}) for d in state["data_list"]]
builder.add_conditional_edges("start", send_tasks, ["task"])

🎯 实际应用案例

案例 1:文档摘要

python
# Map: 为每个段落生成摘要
def summarize_chunk(state: ChunkState):
    summary = llm.invoke(f"Summarize: {state['chunk']}")
    return {"summaries": [summary]}

# Reduce: 合并所有段落摘要
def combine_summaries(state: OverallState):
    all_summaries = "\n".join(state["summaries"])
    final = llm.invoke(f"Create final summary: {all_summaries}")
    return {"final_summary": final}

案例 2:多语言翻译

python
# Map: 翻译到多种语言
def send_to_translate(state):
    languages = ["es", "fr", "de", "zh"]
    return [Send("translate", {"lang": lang, "text": state["text"]})
            for lang in languages]

# Reduce: 收集所有翻译
def collect_translations(state):
    return {"all_translations": state["translations"]}

案例 3:多角度分析

python
# Map: 从不同角度分析文本
def send_to_analyze(state):
    perspectives = ["technical", "business", "user", "security"]
    return [Send("analyze", {"perspective": p, "text": state["text"]})
            for p in perspectives]

# Reduce: 综合所有分析
def synthesize(state):
    combined = llm.invoke(f"Synthesize these analyses: {state['analyses']}")
    return {"final_analysis": combined}

📖 扩展阅读


🔍 常见问题

Q1: Send 和普通边有什么区别?

普通边: 静态路由,设计时确定

python
graph.add_edge("A", "B")  # A 总是流向 B

Send: 动态路由,运行时确定

python
# 根据数据动态创建多个并行任务
return [Send("B", data) for data in dynamic_data]

Q2: 为什么 generate_joke 返回 {"jokes": [joke]} 而不是 {"jokes": joke}

因为 OverallState.jokes 使用了 operator.add reducer,它期望操作列表:

python
# 正确 ✅
operator.add(["joke1"], ["joke2"])  # → ["joke1", "joke2"]

# 错误 ❌
operator.add("joke1", "joke2")  # → "joke1joke2" (字符串拼接)

Q3: 可以在 Send 中发送完全不同的状态吗?

可以!Send 发送的状态不需要与 OverallState 匹配:

python
# OverallState 有 topic, subjects, jokes
# 但 Send 可以发送任意结构
Send("generate_joke", {"subject": "cats", "style": "silly"})

只要目标节点(generate_joke)能处理这个状态即可。


总结:Map-Reduce 是 LangGraph 中处理批量、并行任务的强大模式。通过 Send API,我们可以动态创建任意数量的并行任务,然后用 reducer 优雅地聚合结果。这是构建可扩展、高性能 AI 应用的关键技术!

基于 MIT 许可证发布。内容版权归作者所有。