Skip to content

LangGraph Map-Reduce 详细解读

📚 概述

本文档详细解读 LangGraph 中的 Map-Reduce 模式。这是一种经典的分布式计算模式,在 LangGraph 中用于高效的任务分解和并行处理。通过 Map-Reduce,我们可以将复杂任务拆分成多个子任务并行执行,然后聚合结果。

🎯 核心概念

什么是 Map-Reduce?

Map-Reduce 是一种编程模型,包含两个阶段:

  1. Map(映射)阶段

    • 将大任务分解为多个小的子任务
    • 并行处理每个子任务
    • 每个子任务独立执行,互不影响
  2. Reduce(归约)阶段

    • 收集所有子任务的结果
    • 对结果进行聚合、汇总或筛选
    • 产生最终输出

📚 术语表

术语名称LangGraph 定义和解读Python 定义和说明重要程度
Map-Reduce分解任务并行处理(Map),然后聚合结果(Reduce)的编程模式经典分布式计算模式,Google MapReduce 论文提出⭐⭐⭐⭐⭐
Send API动态创建并行任务的核心 API,格式:Send(节点名, 状态字典)from langgraph.types import Send,运行时决定并行数量⭐⭐⭐⭐⭐
Map 阶段将大任务分解为多个小任务并并行处理的阶段每个子任务独立执行,互不影响,数量动态可变⭐⭐⭐⭐⭐
Reduce 阶段收集所有子任务结果并聚合、汇总的阶段使用 reducer(如 operator.add)合并并行结果⭐⭐⭐⭐⭐
动态分发根据运行时数据决定并行任务数量的机制通过 Send 返回列表实现,列表长度即并行任务数⭐⭐⭐⭐⭐
条件边 + Sendadd_conditional_edges(源节点, 返回Send列表的函数, [目标节点])LangGraph 实现动态并行的关键,自动处理并行和聚合⭐⭐⭐⭐⭐
Pydantic BaseModel定义数据模型并支持自动验证的类from pydantic import BaseModel,用于 LLM 结构化输出⭐⭐⭐⭐
with_structured_output()LangChain 方法,强制 LLM 返回符合指定模型的 JSONllm.with_structured_output(Model).invoke(prompt)⭐⭐⭐⭐
OverallState vs 局部State全局状态贯穿整个图,局部状态仅用于特定节点OverallState 收集所有结果,局部State 只包含必需字段⭐⭐⭐⭐
operator.addPython 内置函数,用作列表拼接的 reduceroperator.add([1,2], [3,4]) 返回 [1,2,3,4]⭐⭐⭐⭐

经典应用场景

  • 文档处理:分段处理长文档,然后汇总摘要
  • 数据分析:并行分析多个数据源,聚合统计结果
  • 内容生成:生成多个候选内容,选择最佳结果
  • 多源查询:并行查询多个 API,合并结果

🎭 实战案例:笑话生成系统

我们将构建一个智能笑话生成系统,演示 Map-Reduce 的完整流程:

需求:

  1. Map 阶段:根据一个主题(如"动物"),生成多个子主题的笑话
  2. Reduce 阶段:从所有生成的笑话中,选出最好的一个

系统架构图

用户输入主题 "animals"

   [generate_topics] 生成子主题: [mammals, reptiles, birds]

    (Send API 动态分发)

   ┌────────┬────────┬────────┐
   ↓        ↓        ↓        ↓
[joke-1] [joke-2] [joke-3]  (Map 阶段:并行生成)
   ↓        ↓        ↓
   └────────┴────────┴────────┘

        [best_joke]  (Reduce 阶段:选择最佳)

         返回结果

🔧 代码实现详解

1. 定义提示词和模型

python
from langchain_openai import ChatOpenAI

# 三个关键提示词
subjects_prompt = """Generate a list of 3 sub-topics that are all related to this overall topic: {topic}."""
joke_prompt = """Generate a joke about {subject}"""
best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one, starting 0 as the ID for the first joke. Jokes: \n\n  {jokes}"""

# 初始化 LLM
model = ChatOpenAI(model="gpt-5-nano", temperature=0)

说明:

  • subjects_prompt:将主题拆分为 3 个子主题
  • joke_prompt:为单个子主题生成笑话
  • best_joke_prompt:从多个笑话中选择最好的

2. 定义状态(State)

这是 Map-Reduce 的关键!我们需要两种状态:

全局状态(OverallState)

python
import operator
from typing import Annotated
from typing_extensions import TypedDict
from pydantic import BaseModel

class Subjects(BaseModel):
    subjects: list[str]

class BestJoke(BaseModel):
    id: int

class OverallState(TypedDict):
    topic: str                                  # 用户输入的主题
    subjects: list                               # 生成的子主题列表
    jokes: Annotated[list, operator.add]        # 收集的笑话(支持并行追加)
    best_selected_joke: str                     # 最终选出的最佳笑话

关键点:

  • jokes 使用 operator.add reducer,允许多个并行节点同时写入
  • 这与之前学习的 parallelization 知识一致

局部状态(JokeState)

python
class JokeState(TypedDict):
    subject: str  # 单个笑话生成节点只需要知道主题

class Joke(BaseModel):
    joke: str

Python 知识点:Pydantic BaseModel

BaseModel 是 Pydantic 库的核心类,用于:

  • 数据验证
  • 类型检查
  • 结构化输出
python
# 使用示例
class Joke(BaseModel):
    joke: str

# LLM 会返回符合这个结构的数据
response = model.with_structured_output(Joke).invoke(prompt)
# response.joke 就是字符串类型的笑话内容

3. Map 阶段:生成子主题

python
def generate_topics(state: OverallState):
    prompt = subjects_prompt.format(topic=state["topic"])
    response = model.with_structured_output(Subjects).invoke(prompt)
    return {"subjects": response.subjects}

功能: 将用户输入的主题(如 "animals")分解为 3 个子主题(如 ["mammals", "reptiles", "birds"])


4. 动态任务分发:Send API ⭐

这是 Map-Reduce 的核心魔法

python
from langgraph.types import Send

def continue_to_jokes(state: OverallState):
    # 为每个子主题创建一个 Send 任务
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

Send API 详解:

python
Send("generate_joke", {"subject": s})
#    ^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^
#    目标节点名称      发送的状态数据

重要特性:

  1. 动态并行化:自动为列表中的每个元素创建并行任务
  2. 状态灵活性:可以发送任意状态,不需要与 OverallState 完全匹配
  3. 自动扩展:无论有 3 个还是 300 个子主题,都能自动并行处理

为什么强大?

传统方式需要预先知道有多少个并行任务,而 Send 可以根据运行时数据动态创建任务:

python
# 如果有 3 个主题,Send 会创建 3 个并行任务
subjects = ["mammals", "reptiles", "birds"]
# 相当于:
# - Send("generate_joke", {"subject": "mammals"})
# - Send("generate_joke", {"subject": "reptiles"})
# - Send("generate_joke", {"subject": "birds"})

# 如果有 10 个主题,自动创建 10 个并行任务!

5. Map 阶段:并行生成笑话

python
def generate_joke(state: JokeState):
    prompt = joke_prompt.format(subject=state["subject"])
    response = model.with_structured_output(Joke).invoke(prompt)
    return {"jokes": [response.joke]}

关键细节:

  • 输入:JokeState(只包含 subject
  • 输出:{"jokes": [response.joke]}(注意是列表!)
  • 返回的数据会被写回 OverallStatejokes 字段
  • 由于 jokesoperator.add reducer,多个并行节点的输出会自动拼接

执行流程示意:

generate_joke(subject="mammals")  → 返回 {"jokes": ["joke1"]}
generate_joke(subject="reptiles") → 返回 {"jokes": ["joke2"]}
generate_joke(subject="birds")    → 返回 {"jokes": ["joke3"]}

最终 OverallState.jokes = ["joke1", "joke2", "joke3"]

6. Reduce 阶段:选择最佳笑话

python
def best_joke(state: OverallState):
    # 将所有笑话合并为一个字符串
    jokes = "\n\n".join(state["jokes"])

    # 让 LLM 选择最好的笑话
    prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
    response = model.with_structured_output(BestJoke).invoke(prompt)

    # 返回被选中的笑话
    return {"best_selected_joke": state["jokes"][response.id]}

功能:

  • 接收所有并行生成的笑话
  • 使用 LLM 评估并选择最佳笑话
  • BestJoke 模型返回最佳笑话的索引(id)
  • 根据索引从 state["jokes"] 中取出最佳笑话

7. 构建图

python
from langgraph.graph import END, StateGraph, START

# 创建图
graph = StateGraph(OverallState)

# 添加节点
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)

# 添加边
graph.add_edge(START, "generate_topics")

# 条件边:使用 Send 动态分发任务
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])

graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)

# 编译
app = graph.compile()

# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))

LangGraph 知识点:条件边与 Send

python
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
#                          ^^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^^
#                          源节点             条件函数             可能的目标节点
  • continue_to_jokes 返回 Send 对象列表
  • LangGraph 会自动为每个 Send 创建一个到 generate_joke 的并行路径
  • 所有并行任务完成后,自动进入 best_joke 节点

8. 执行图

python
for s in app.stream({"topic": "animals"}):
    print(s)

输出示例:

python
{'generate_topics': {'subjects': ['mammals', 'reptiles', 'birds']}}

{'generate_joke': {'jokes': ["Why don't mammals ever get lost? Because they always follow their 'instincts'!"]}}
{'generate_joke': {'jokes': ["Why don't alligators like fast food? Because they can't catch it!"]}}
{'generate_joke': {'jokes': ["Why do birds fly south for the winter? Because it's too far to walk!"]}}

{'best_joke': {'best_selected_joke': "Why don't alligators like fast food? Because they can't catch it!"}}

观察要点:

  1. 首先生成 3 个子主题
  2. 然后并行生成 3 个笑话(注意输出顺序可能不同)
  3. 最后选出最佳笑话

🎓 核心知识点总结

LangGraph 特有概念

1. Send API

作用: 动态创建并行任务

python
Send(node_name, state_dict)

特点:

  • 运行时动态决定并行数量
  • 可以发送自定义状态(不必是完整的图状态)
  • 自动处理并行执行和结果聚合

2. Map-Reduce 模式

阶段作用节点数量执行方式
Map分解任务,生成结果动态(根据数据)并行
Reduce聚合结果1 个串行(等待所有 Map 完成)

3. 多状态设计

  • OverallState:全局状态,贯穿整个图
  • JokeState:局部状态,只用于特定节点
  • 通过 Send 可以在不同状态之间传递数据

4. 条件边 + Send

python
graph.add_conditional_edges(
    "source_node",           # 源节点
    condition_function,      # 返回 Send 列表的函数
    ["target_node"]         # 目标节点(Send 指向的节点)
)

Python 特有知识点

1. Pydantic BaseModel

python
from pydantic import BaseModel

class Joke(BaseModel):
    joke: str

# 用于 LLM 结构化输出
response = model.with_structured_output(Joke).invoke(prompt)

优势:

  • 自动类型验证
  • 清晰的数据结构
  • 与 LangChain 无缝集成

2. TypedDict vs BaseModel

特性TypedDictBaseModel
类型检查静态(IDE 提示)运行时验证
验证
用途状态定义数据模型、API 输出
性能更快(无验证)稍慢(有验证)
python
# TypedDict - 用于 State
class OverallState(TypedDict):
    topic: str

# BaseModel - 用于结构化输出
class Joke(BaseModel):
    joke: str

3. 列表推导式 + Send

python
# 创建多个 Send 任务的优雅方式
[Send("generate_joke", {"subject": s}) for s in subjects]

# 等价于:
result = []
for s in subjects:
    result.append(Send("generate_joke", {"subject": s}))

💡 最佳实践

1. 何时使用 Map-Reduce?

适用场景:

  • 需要对多个数据项执行相同操作(如批量翻译、批量摘要)
  • 任务可以自然分解为独立子任务(如分段处理文档)
  • 需要从多个候选结果中筛选(如生成多个答案选最佳)
  • 数据量大,需要并行加速(如分析多个数据源)

不适用场景:

  • 子任务之间有依赖关系
  • 无法自然分解的任务
  • 单一数据源的简单查询

2. Send API 使用技巧

技巧 1:动态控制并行数量

python
def continue_to_jokes(state: OverallState):
    # 可以根据条件过滤
    subjects = [s for s in state["subjects"] if len(s) > 3]
    return [Send("generate_joke", {"subject": s}) for s in subjects]

技巧 2:发送额外上下文

python
def continue_to_jokes(state: OverallState):
    return [
        Send("generate_joke", {
            "subject": s,
            "original_topic": state["topic"],  # 传递额外信息
            "style": "family-friendly"
        })
        for s in state["subjects"]
    ]

技巧 3:条件性发送

python
def continue_to_jokes(state: OverallState):
    sends = []
    for i, s in enumerate(state["subjects"]):
        if i < 5:  # 最多只处理前 5 个
            sends.append(Send("generate_joke", {"subject": s}))
    return sends

3. 状态设计原则

原则 1:最小化局部状态

python
# ✅ 好的设计 - 只包含必需字段
class JokeState(TypedDict):
    subject: str

# ❌ 不好的设计 - 包含不需要的字段
class JokeState(TypedDict):
    subject: str
    topic: str  # generate_joke 不需要这个
    jokes: list  # 也不需要这个

原则 2:使用 Reducer 聚合结果

python
# Map 节点的输出字段必须有 reducer
class OverallState(TypedDict):
    jokes: Annotated[list, operator.add]  # ✅ 正确
    # jokes: list  # ❌ 错误!并行更新会冲突

原则 3:清晰的状态流转

python
OverallState (完整状态)

Send → JokeState (子集)

返回 → OverallState.jokes (部分更新)

🚀 进阶技巧

1. 多层 Map-Reduce

可以嵌套多个 Map-Reduce 层次:

python
主题
 ↓ Map
子主题 (Level 1)
 ↓ Map
详细主题 (Level 2)
 ↓ Reduce
汇总子主题
 ↓ Reduce
最终结果

2. 带错误处理的 Map-Reduce

python
def generate_joke(state: JokeState):
    try:
        prompt = joke_prompt.format(subject=state["subject"])
        response = model.with_structured_output(Joke).invoke(prompt)
        return {"jokes": [response.joke]}
    except Exception as e:
        # 返回错误标记或默认值
        return {"jokes": [f"Error generating joke for {state['subject']}"]}

3. 限制并行度

虽然 Send 会自动并行,但有时需要限制同时执行的任务数(如 API 速率限制):

python
# 方法 1:在条件函数中限制
def continue_to_jokes(state: OverallState):
    # 只发送前 N 个
    max_parallel = 5
    subjects = state["subjects"][:max_parallel]
    return [Send("generate_joke", {"subject": s}) for s in subjects]

# 方法 2:分批处理(需要更复杂的图设计)

📊 Map-Reduce vs 简单并行

特性Map-Reduce (Send)简单并行 (Fan-out)
并行数量动态(运行时决定)静态(设计时固定)
状态传递灵活(可自定义)固定(使用图状态)
适用场景列表处理、批量任务固定数量的并行路径
复杂度中等
扩展性

示例对比:

python
# 简单并行 - 固定 3 个路径
builder.add_edge("start", "task1")
builder.add_edge("start", "task2")
builder.add_edge("start", "task3")

# Map-Reduce - 动态 N 个任务
def send_tasks(state):
    return [Send("task", {"data": d}) for d in state["data_list"]]
builder.add_conditional_edges("start", send_tasks, ["task"])

🎯 实际应用案例

案例 1:文档摘要

python
# Map: 为每个段落生成摘要
def summarize_chunk(state: ChunkState):
    summary = llm.invoke(f"Summarize: {state['chunk']}")
    return {"summaries": [summary]}

# Reduce: 合并所有段落摘要
def combine_summaries(state: OverallState):
    all_summaries = "\n".join(state["summaries"])
    final = llm.invoke(f"Create final summary: {all_summaries}")
    return {"final_summary": final}

案例 2:多语言翻译

python
# Map: 翻译到多种语言
def send_to_translate(state):
    languages = ["es", "fr", "de", "zh"]
    return [Send("translate", {"lang": lang, "text": state["text"]})
            for lang in languages]

# Reduce: 收集所有翻译
def collect_translations(state):
    return {"all_translations": state["translations"]}

案例 3:多角度分析

python
# Map: 从不同角度分析文本
def send_to_analyze(state):
    perspectives = ["technical", "business", "user", "security"]
    return [Send("analyze", {"perspective": p, "text": state["text"]})
            for p in perspectives]

# Reduce: 综合所有分析
def synthesize(state):
    combined = llm.invoke(f"Synthesize these analyses: {state['analyses']}")
    return {"final_analysis": combined}

📖 扩展阅读


🔍 常见问题

Q1: Send 和普通边有什么区别?

普通边: 静态路由,设计时确定

python
graph.add_edge("A", "B")  # A 总是流向 B

Send: 动态路由,运行时确定

python
# 根据数据动态创建多个并行任务
return [Send("B", data) for data in dynamic_data]

Q2: 为什么 generate_joke 返回 {"jokes": [joke]} 而不是 {"jokes": joke}

因为 OverallState.jokes 使用了 operator.add reducer,它期望操作列表:

python
# 正确 ✅
operator.add(["joke1"], ["joke2"])  # → ["joke1", "joke2"]

# 错误 ❌
operator.add("joke1", "joke2")  # → "joke1joke2" (字符串拼接)

Q3: 可以在 Send 中发送完全不同的状态吗?

可以!Send 发送的状态不需要与 OverallState 匹配:

python
# OverallState 有 topic, subjects, jokes
# 但 Send 可以发送任意结构
Send("generate_joke", {"subject": "cats", "style": "silly"})

只要目标节点(generate_joke)能处理这个状态即可。


总结:Map-Reduce 是 LangGraph 中处理批量、并行任务的强大模式。通过 Send API,我们可以动态创建任意数量的并行任务,然后用 reducer 优雅地聚合结果。这是构建可扩展、高性能 AI 应用的关键技术!


完整案例代码(可直接运行)

以下是一个完整的 Map-Reduce 示例,展示了笑话生成系统的动态并行处理:

python
"""
LangGraph Map-Reduce 完整示例
演示 Send API 动态创建并行任务

场景:智能笑话生成系统
1. Map 阶段:根据主题生成多个子主题的笑话
2. Reduce 阶段:从所有笑话中选出最好的一个
"""

import operator
from typing import Annotated, List
from typing_extensions import TypedDict
from pydantic import BaseModel
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from langchain_openai import ChatOpenAI

# ========== 1. 数据模型定义 ==========

class Subjects(BaseModel):
    """子主题列表模型"""
    subjects: List[str]

class Joke(BaseModel):
    """笑话模型"""
    joke: str

class BestJoke(BaseModel):
    """最佳笑话选择模型"""
    id: int  # 最佳笑话的索引

# ========== 2. 状态定义 ==========

# 全局状态(贯穿整个图)
class OverallState(TypedDict):
    topic: str                                    # 用户输入的主题
    subjects: List[str]                           # 生成的子主题列表
    jokes: Annotated[List[str], operator.add]     # 所有生成的笑话(支持并行追加)
    best_selected_joke: str                       # 最终选出的最佳笑话

# 局部状态(只用于 Map 节点)
class JokeState(TypedDict):
    subject: str  # 单个笑话生成节点只需要知道子主题

# ========== 3. 初始化 LLM ==========

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)

# ========== 4. 提示词模板 ==========

subjects_prompt = """请根据以下主题生成 3 个相关的子主题,用于创作笑话。

主题: {topic}

要求:
- 每个子主题应该是具体的、有趣的
- 子主题之间应该有一定差异性
- 适合创作幽默内容

请直接返回 3 个子主题。"""

joke_prompt = """请创作一个关于 "{subject}" 的笑话。

要求:
- 笑话要简短有趣
- 适合所有年龄段
- 尽量出人意料的结局

请直接返回笑话内容。"""

best_joke_prompt = """以下是几个关于 "{topic}" 的笑话,请选出最好的一个。

笑话列表:
{jokes}

请返回最好笑话的编号(从 0 开始)。"""

# ========== 5. 定义节点函数 ==========

def generate_topics(state: OverallState):
    """
    生成子主题(Map 之前的准备步骤)
    将用户输入的主题分解为多个子主题
    """
    topic = state["topic"]

    prompt = subjects_prompt.format(topic=topic)
    response = llm.with_structured_output(Subjects).invoke(prompt)

    print(f"🎯 主题: {topic}")
    print(f"📝 生成了 {len(response.subjects)} 个子主题: {response.subjects}")

    return {"subjects": response.subjects}

def continue_to_jokes(state: OverallState):
    """
    动态任务分发函数(Map 的核心)
    为每个子主题创建一个 Send 任务
    """
    subjects = state["subjects"]

    # ⭐ 关键:使用 Send API 动态创建并行任务
    sends = [Send("generate_joke", {"subject": s}) for s in subjects]

    print(f"📤 分发 {len(sends)} 个笑话生成任务")
    return sends

def generate_joke(state: JokeState):
    """
    生成单个笑话(Map 节点)
    每个并行任务独立执行
    """
    subject = state["subject"]

    prompt = joke_prompt.format(subject=subject)
    response = llm.with_structured_output(Joke).invoke(prompt)

    print(f"😂 生成笑话 - 主题: {subject}")

    # 返回列表!因为 jokes 字段使用 operator.add 作为 reducer
    return {"jokes": [response.joke]}

def best_joke(state: OverallState):
    """
    选择最佳笑话(Reduce 节点)
    收集所有并行生成的笑话,选出最好的一个
    """
    topic = state["topic"]
    jokes = state["jokes"]

    # 格式化笑话列表
    jokes_formatted = "\n\n".join([
        f"笑话 {i}:\n{joke}"
        for i, joke in enumerate(jokes)
    ])

    prompt = best_joke_prompt.format(topic=topic, jokes=jokes_formatted)
    response = llm.with_structured_output(BestJoke).invoke(prompt)

    # 获取被选中的笑话
    selected_joke = jokes[response.id]

    print(f"🏆 选出最佳笑话(编号 {response.id})")
    return {"best_selected_joke": selected_joke}

# ========== 6. 构建图 ==========

def build_joke_generator():
    """构建笑话生成图"""

    builder = StateGraph(OverallState)

    # 添加节点
    builder.add_node("generate_topics", generate_topics)
    builder.add_node("generate_joke", generate_joke)
    builder.add_node("best_joke", best_joke)

    # 添加边
    builder.add_edge(START, "generate_topics")

    # ⭐ 条件边:使用 Send 动态分发任务
    builder.add_conditional_edges(
        "generate_topics",       # 源节点
        continue_to_jokes,       # 返回 Send 列表的函数
        ["generate_joke"]        # 可能的目标节点
    )

    # 所有 Map 任务完成后,进入 Reduce 节点
    builder.add_edge("generate_joke", "best_joke")
    builder.add_edge("best_joke", END)

    return builder.compile()

# ========== 7. 主程序 ==========

if __name__ == "__main__":
    # 构建图
    graph = build_joke_generator()

    # 可视化图结构
    print("=" * 60)
    print("📊 图结构可视化")
    print("=" * 60)

    try:
        from IPython.display import Image, display
        display(Image(graph.get_graph().draw_mermaid_png()))
    except Exception:
        print(graph.get_graph().draw_mermaid())

    # 执行测试
    print("\n" + "=" * 60)
    print("🚀 执行笑话生成系统")
    print("=" * 60 + "\n")

    topic = "动物"

    # 使用 stream 模式观察执行过程
    print("⏳ 执行中...\n")

    for step in graph.stream({"topic": topic}):
        for node_name, output in step.items():
            if node_name == "generate_joke":
                print(f"  📌 笑话生成完成")
            else:
                print(f"📌 节点 [{node_name}] 完成")

    # 获取最终结果
    result = graph.invoke({"topic": topic})

    # 显示结果
    print("\n" + "=" * 60)
    print("📋 最终结果")
    print("=" * 60)

    print(f"\n🎯 主题: {result['topic']}")
    print(f"\n📝 子主题: {result['subjects']}")

    print(f"\n😂 所有生成的笑话:")
    for i, joke in enumerate(result['jokes']):
        print(f"\n  笑话 {i}:")
        print(f"  {joke}")

    print(f"\n🏆 最佳笑话:")
    print(f"  {result['best_selected_joke']}")

    # 测试不同主题
    print("\n" + "=" * 60)
    print("🔄 测试其他主题")
    print("=" * 60 + "\n")

    for test_topic in ["科技", "美食"]:
        print(f"主题: {test_topic}")
        result = graph.invoke({"topic": test_topic})
        print(f"最佳笑话: {result['best_selected_joke'][:50]}...\n")

运行结果示例

============================================================
📊 图结构可视化
============================================================
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
    __start__([__start__]):::startclass;
    generate_topics([generate_topics]):::otherclass;
    generate_joke([generate_joke]):::otherclass;
    best_joke([best_joke]):::otherclass;
    __end__([__end__]):::endclass;
    __start__ --> generate_topics;
    generate_topics -.-> generate_joke;
    generate_joke --> best_joke;
    best_joke --> __end__;

============================================================
🚀 执行笑话生成系统
============================================================

⏳ 执行中...

🎯 主题: 动物
📝 生成了 3 个子主题: ['猫', '狗', '大象']
📤 分发 3 个笑话生成任务
📌 节点 [generate_topics] 完成
😂 生成笑话 - 主题: 猫
  📌 笑话生成完成
😂 生成笑话 - 主题: 狗
  📌 笑话生成完成
😂 生成笑话 - 主题: 大象
  📌 笑话生成完成
🏆 选出最佳笑话(编号 1)
📌 节点 [best_joke] 完成

============================================================
📋 最终结果
============================================================

🎯 主题: 动物

📝 子主题: ['猫', '狗', '大象']

😂 所有生成的笑话:

  笑话 0:
  为什么猫总是看起来很傲慢?因为它们知道每次你叫它们的名字,它们选择不回应!

  笑话 1:
  狗为什么不会用电脑?因为它们总是把"Bark"错打成"Bytes"!

  笑话 2:
  大象为什么从来不玩捉迷藏?因为它太大了,藏哪都被发现!

🏆 最佳笑话:
  狗为什么不会用电脑?因为它们总是把"Bark"错打成"Bytes"!

============================================================
🔄 测试其他主题
============================================================

主题: 科技
最佳笑话: 为什么程序员总是穿灰色衣服?因为他们不想处理"颜色"异常!...

主题: 美食
最佳笑话: 为什么披萨从来不说谎?因为它总是"薄饼"坦白!...

核心知识点回顾

概念说明代码示例
Send API动态创建并行任务Send("node_name", {"key": value})
动态分发运行时决定并行数量[Send(...) for item in items]
OverallState全局状态,包含所有字段jokes: Annotated[List[str], operator.add]
局部 StateMap 节点只需要的字段class JokeState: subject: str
条件边 + Send实现动态 Mapadd_conditional_edges(src, send_func, [targets])
Reducer合并并行结果operator.add 拼接列表
Map 阶段并行处理每个子任务generate_joke 节点
Reduce 阶段聚合所有结果best_joke 节点

基于 MIT 许可证发布。内容版权归作者所有。