Skip to content

LangGraph 子图(Sub-graph)详细解读

📋 案例概览

这个案例展示了 LangGraph 中**子图(Sub-graph)**的核心概念和使用方法。虽然代码相对简单,但它揭示了一个非常重要的架构模式:如何在复杂系统中管理不同的状态空间

业务场景

想象你在运营一个问答系统,需要定期分析用户日志:

  1. 日志清洗:原始日志需要预处理
  2. 失败分析:找出回答质量差的记录,生成改进报告
  3. 问题总结:分析用户最关心的主题,发送到 Slack 通知团队

核心挑战

这两个分析任务(失败分析 & 问题总结)需要:

  • 并行执行以提高效率
  • 独立的状态空间(各自有不同的中间变量)
  • 共享输入数据(都需要访问清洗后的日志)
  • 汇总输出结果到主图

子图正是解决这个问题的最佳方案!


🗺️ 整体架构图

用户输入原始日志(raw_logs)

    ┌─────────┐
    │clean_logs│  清洗日志
    └─────────┘
         ↓ cleaned_logs(共享数据)
    ┌────┴────┐
    ↓         ↓
┌──────┐  ┌──────┐
│失败分析│  │问题总结│  (两个独立子图并行运行)
│子图   │  │子图   │
└──────┘  └──────┘
    ↓         ↓
fa_summary  report
    └────┬────┘

     最终结果汇总

📚 代码分段详解

第一部分:定义数据结构

Log 数据模型

python
from typing_extensions import TypedDict
from typing import List, Optional

class Log(TypedDict):
    id: str              # 日志 ID
    question: str        # 用户问题
    docs: Optional[List] # 检索到的文档(可选)
    answer: str          # 系统回答
    grade: Optional[int] # 质量评分(可选)
    grader: Optional[str]# 评分器名称(可选)
    feedback: Optional[str] # 反馈信息(可选)

Python 知识点

  1. TypedDict:这是 Python 3.8+ 引入的类型,用于定义字典的结构

    python
    # 普通字典 - 不知道有哪些键
    log1 = {"id": "1", "question": "xxx"}
    
    # TypedDict - 明确定义了键和类型
    log2: Log = {"id": "1", "question": "xxx", ...}
  2. Optional[类型]:表示这个字段可以是指定类型,也可以是 None

    python
    grade: Optional[int]  # 等价于 grade: int | None

为什么这样设计?

  • 并非所有日志都有评分(grade),只有被评估过的才有
  • 这种灵活的数据结构适配实际业务场景

第二部分:失败分析子图

状态定义

python
# 子图内部状态(完整状态)
class FailureAnalysisState(TypedDict):
    cleaned_logs: List[Log]   # 输入:清洗后的日志
    failures: List[Log]       # 中间变量:失败的日志
    fa_summary: str           # 输出:失败分析摘要
    processed_logs: List[str] # 输出:处理过的日志 ID

# 子图输出状态(只包含需要返回给主图的字段)
class FailureAnalysisOutputState(TypedDict):
    fa_summary: str           # 只输出摘要
    processed_logs: List[str] # 只输出处理记录

关键概念:为什么需要两个状态?

  1. 内部状态(FailureAnalysisState)

    • 包含所有中间变量(如 failures
    • 用于子图内部节点之间传递数据
  2. 输出状态(FailureAnalysisOutputState)

    • 只包含需要返回给主图的字段
    • 避免不必要的数据传递
    • 减少状态冲突的可能性

类比

  • 内部状态 = 厨房里的所有工具和原料
  • 输出状态 = 端给客人的成品菜肴

节点函数

1. get_failures - 过滤失败日志

python
def get_failures(state):
    """ 获取包含失败记录的日志 """
    cleaned_logs = state["cleaned_logs"]

    # 筛选出有 grade 字段的日志(表示被评分过)
    failures = [log for log in cleaned_logs if "grade" in log]

    return {"failures": failures}

Python 知识点 - 列表推导式

python
# 传统写法
failures = []
for log in cleaned_logs:
    if "grade" in log:
        failures.append(log)

# 列表推导式(更简洁)
failures = [log for log in cleaned_logs if "grade" in log]

业务逻辑

  • 只有被评估过的日志才有 grade 字段
  • 这些日志通常是因为质量问题需要人工审核的

2. generate_summary - 生成摘要

python
def generate_summary(state):
    """ 生成失败分析摘要 """
    failures = state["failures"]

    # 实际应用中,这里会调用 LLM 分析失败原因
    # fa_summary = summarize_with_llm(failures)
    fa_summary = "Poor quality retrieval of Chroma documentation."

    # 记录处理过的日志 ID
    processed_logs = [
        f"failure-analysis-on-log-{failure['id']}"
        for failure in failures
    ]

    return {
        "fa_summary": fa_summary,
        "processed_logs": processed_logs
    }

技术细节

  1. f-string 格式化字符串(Python 3.6+):

    python
    failure_id = "123"
    
    # 传统方式
    msg = "failure-analysis-on-log-" + failure_id
    
    # f-string(更直观)
    msg = f"failure-analysis-on-log-{failure_id}"
  2. processed_logs 的作用

    • 追踪哪些日志被处理过
    • 用于审计和调试
    • 主图会收集所有子图的处理记录

构建子图

python
from langgraph.graph import StateGraph, START, END

fa_builder = StateGraph(
    state_schema=FailureAnalysisState,        # 内部状态
    output_schema=FailureAnalysisOutputState  # 输出状态
)

# 添加节点
fa_builder.add_node("get_failures", get_failures)
fa_builder.add_node("generate_summary", generate_summary)

# 添加边(定义执行顺序)
fa_builder.add_edge(START, "get_failures")
fa_builder.add_edge("get_failures", "generate_summary")
fa_builder.add_edge("generate_summary", END)

执行流程

START → get_failures → generate_summary → END

LangGraph 概念

  • START:特殊节点,表示图的入口
  • END:特殊节点,表示图的出口
  • add_edge(A, B):从节点 A 到节点 B 的有向边

第三部分:问题总结子图

状态定义

python
# 内部状态
class QuestionSummarizationState(TypedDict):
    cleaned_logs: List[Log]   # 输入
    qs_summary: str           # 中间变量:问题摘要
    report: str               # 输出:最终报告
    processed_logs: List[str] # 输出:处理记录

# 输出状态
class QuestionSummarizationOutputState(TypedDict):
    report: str               # 只输出报告
    processed_logs: List[str] # 只输出处理记录

注意qs_summary 是中间变量,不需要返回给主图


节点函数

1. generate_summary - 生成问题摘要

python
def generate_summary(state):
    cleaned_logs = state["cleaned_logs"]

    # 实际会调用 LLM 分析用户问题的主题
    summary = "Questions focused on usage of ChatOllama and Chroma vector store."

    # 记录处理的日志
    processed_logs = [
        f"summary-on-log-{log['id']}"
        for log in cleaned_logs
    ]

    return {
        "qs_summary": summary,
        "processed_logs": processed_logs
    }

2. send_to_slack - 发送报告

python
def send_to_slack(state):
    qs_summary = state["qs_summary"]

    # 实际会基于摘要生成完整报告并发送到 Slack
    # report = generate_report(qs_summary)
    # send_to_slack_api(report)
    report = "foo bar baz"

    return {"report": report}

业务流程

清洗日志 → 生成摘要 → 格式化报告 → 发送到 Slack

构建子图

python
qs_builder = StateGraph(
    QuestionSummarizationState,
    output_schema=QuestionSummarizationOutputState
)

qs_builder.add_node("generate_summary", generate_summary)
qs_builder.add_node("send_to_slack", send_to_slack)

qs_builder.add_edge(START, "generate_summary")
qs_builder.add_edge("generate_summary", "send_to_slack")
qs_builder.add_edge("send_to_slack", END)

第四部分:主图 - 整合子图

主图状态

python
from operator import add
from typing import Annotated

class EntryGraphState(TypedDict):
    raw_logs: List[Log]                      # 输入:原始日志
    cleaned_logs: List[Log]                  # 清洗后的日志
    fa_summary: str                          # 来自失败分析子图
    report: str                              # 来自问题总结子图
    processed_logs: Annotated[List[int], add] # 来自两个子图,需要合并

核心技术点 - Annotated 和 Reducer

python
processed_logs: Annotated[List[int], add]

这行代码包含了三个重要概念:

  1. Annotated:Python 3.9+ 的类型注解增强工具

    python
    from typing import Annotated
    
    # 基本类型注解
    name: str
    
    # 带额外元数据的注解
    age: Annotated[int, "must be positive"]
  2. Reducer(合并器)add 来自 operator 模块

    python
    from operator import add
    
    # add 的作用是合并列表
    result = add([1, 2], [3, 4])  # 结果:[1, 2, 3, 4]
  3. 为什么需要 Reducer?

    因为两个子图并行运行,都会返回 processed_logs

    python
    # 失败分析子图返回
    {"processed_logs": ["failure-analysis-on-log-2"]}
    
    # 问题总结子图返回
    {"processed_logs": ["summary-on-log-1", "summary-on-log-2"]}
    
    # LangGraph 需要知道如何合并这两个结果
    # 使用 add 后:
    {"processed_logs": [
        "failure-analysis-on-log-2",
        "summary-on-log-1",
        "summary-on-log-2"
    ]}

常见的 Reducer

python
from operator import add

# 列表合并
Annotated[List[str], add]  # [1, 2] + [3] = [1, 2, 3]

# 自定义 Reducer
def merge_dicts(a, b):
    return {**a, **b}

Annotated[dict, merge_dicts]

为什么 cleaned_logs 不需要 Reducer?

问题cleaned_logs 也会被两个子图访问,为什么不需要 Annotated

答案:因为我们定义了 output_schema

python
# 失败分析子图的输出状态(不包含 cleaned_logs)
class FailureAnalysisOutputState(TypedDict):
    fa_summary: str
    processed_logs: List[str]
    # 注意:没有 cleaned_logs

# 问题总结子图的输出状态(也不包含 cleaned_logs)
class QuestionSummarizationOutputState(TypedDict):
    report: str
    processed_logs: List[str]
    # 注意:也没有 cleaned_logs

机制解释

  1. 没有 output_schema 时

    python
    StateGraph(MyState)  # 所有状态字段都会输出
    • 子图会返回所有状态字段
    • cleaned_logs 会被两个子图都返回
    • 需要 Reducer 来合并
  2. 有 output_schema 时

    python
    StateGraph(MyState, output_schema=MyOutputState)
    • 子图只返回 MyOutputState 中定义的字段
    • cleaned_logs 不在输出状态中,不会返回
    • 不需要 Reducer

类比

  • 没有 output_schema = 函数返回所有局部变量
  • 有 output_schema = 函数只返回你指定的值

主图节点和构建

clean_logs 节点

python
def clean_logs(state):
    """ 清洗日志 """
    raw_logs = state["raw_logs"]

    # 实际应用中会进行数据清洗:
    # - 去重
    # - 格式标准化
    # - 过滤无效数据
    cleaned_logs = raw_logs  # 简化示例

    return {"cleaned_logs": cleaned_logs}

构建主图

python
entry_builder = StateGraph(EntryGraphState)

# 添加常规节点
entry_builder.add_node("clean_logs", clean_logs)

# 关键:将编译后的子图作为节点添加
entry_builder.add_node(
    "question_summarization",
    qs_builder.compile()  # 编译子图
)

entry_builder.add_node(
    "failure_analysis",
    fa_builder.compile()  # 编译子图
)

# 定义执行流程
entry_builder.add_edge(START, "clean_logs")
entry_builder.add_edge("clean_logs", "failure_analysis")
entry_builder.add_edge("clean_logs", "question_summarization")
entry_builder.add_edge("failure_analysis", END)
entry_builder.add_edge("question_summarization", END)

# 编译主图
graph = entry_builder.compile()

关键点

  1. 子图也是节点

    python
    entry_builder.add_node("子图名称", 子图实例.compile())
  2. 并行执行

    python
    entry_builder.add_edge("clean_logs", "failure_analysis")
    entry_builder.add_edge("clean_logs", "question_summarization")

    clean_logs 出发有两条边,两个子图会并行运行

  3. xray 参数

    python
    graph.get_graph(xray=1).draw_mermaid_png()

    xray=1 会展开子图的内部结构,方便调试


第五部分:运行示例

准备测试数据

python
# 正常的问答日志
question_answer = Log(
    id="1",
    question="How can I import ChatOllama?",
    answer="To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'",
    # 没有 grade 字段,表示质量正常
)

# 有质量问题的日志
question_answer_feedback = Log(
    id="2",
    question="How can I use Chroma vector store?",
    answer="To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).",
    grade=0,  # 评分为 0,表示质量差
    grader="Document Relevance Recall",
    feedback="The retrieved documents discuss vector stores in general, but not Chroma specifically",
)

raw_logs = [question_answer, question_answer_feedback]

执行图

python
result = graph.invoke({"raw_logs": raw_logs})

输出结果

python
{
    'raw_logs': [...],  # 原始输入

    'cleaned_logs': [...],  # clean_logs 节点的输出

    'fa_summary': 'Poor quality retrieval of Chroma documentation.',
    # ↑ 来自失败分析子图

    'report': 'foo bar baz',
    # ↑ 来自问题总结子图

    'processed_logs': [
        'failure-analysis-on-log-2',   # 失败分析处理了 log-2
        'summary-on-log-1',            # 问题总结处理了 log-1
        'summary-on-log-2'             # 问题总结处理了 log-2
    ]
    # ↑ 两个子图的 processed_logs 通过 add 合并
}

🔑 核心技术解析

1. 子图的本质

子图是什么?

  • 子图本身就是一个完整的 StateGraph
  • 可以独立运行和测试
  • 通过 compile() 编译后,可以作为节点嵌入其他图

类比

  • 主图 = 一个公司
  • 子图 = 公司内的部门
  • 每个部门有自己的工作流程(内部状态)
  • 部门只向公司汇报关键结果(输出状态)

2. 状态通信机制

主图 → 子图(输入)

python
class EntryGraphState(TypedDict):
    cleaned_logs: List[Log]  # 主图有这个字段

class FailureAnalysisState(TypedDict):
    cleaned_logs: List[Log]  # 子图也有这个字段

当主图调用子图时,LangGraph 会自动传递同名字段

python
# 主图状态
{"cleaned_logs": [...]}

# 自动传递给子图
# 子图可以访问 state["cleaned_logs"]

子图 → 主图(输出)

python
class FailureAnalysisOutputState(TypedDict):
    fa_summary: str  # 子图输出这个字段

class EntryGraphState(TypedDict):
    fa_summary: str  # 主图有同名字段接收

子图返回的字段会更新主图状态中的同名字段。


关键原则

同名字段 = 通信通道

  • 主图和子图通过同名字段传递数据
  • 不同名的字段互不干扰

输入 = 状态交集

  • 子图只能访问与主图同名的输入字段

输出 = output_schema

  • 子图只返回 output_schema 中定义的字段
  • 减少不必要的数据传递

3. 并行子图的结果合并

场景:两个子图并行运行,都返回同名字段

python
# 子图 A 返回
{"processed_logs": ["a", "b"]}

# 子图 B 返回
{"processed_logs": ["c", "d"]}

# 如何合并?

解决方案 1:使用 Reducer

python
from operator import add

class State(TypedDict):
    processed_logs: Annotated[List[str], add]  # 指定合并方式

结果:{"processed_logs": ["a", "b", "c", "d"]}


解决方案 2:使用不同的字段名

python
class EntryState(TypedDict):
    fa_processed_logs: List[str]  # 失败分析的记录
    qs_processed_logs: List[str]  # 问题总结的记录

class FAOutputState(TypedDict):
    fa_processed_logs: List[str]  # 不同名

class QSOutputState(TypedDict):
    qs_processed_logs: List[str]  # 不同名

这样两个子图返回不同的字段,不需要合并。


4. 何时使用子图?

适合使用子图的场景

独立的业务逻辑

  • 每个子图负责一个完整的子任务
  • 有清晰的输入和输出边界

需要状态隔离

  • 子图的中间变量不应该污染主图状态
  • 例如:failures 只在失败分析子图内使用

可复用的流程

  • 同一个子图可以在不同的主图中复用
  • 例如:日志清洗子图可用于多个场景

并行执行

  • 多个独立任务需要同时运行
  • 通过 Reducer 合并结果

不适合使用子图的场景

简单的线性流程

  • 如果只是几个顺序执行的节点,不需要子图

状态高度耦合

  • 如果节点之间需要频繁共享中间状态

过度设计

  • 不要为了用子图而用子图

💡 Python 知识点总结

1. TypedDict

python
from typing_extensions import TypedDict

# 定义字典结构
class Person(TypedDict):
    name: str
    age: int

# 使用
person: Person = {"name": "Alice", "age": 30}

优势

  • 提供类型检查
  • IDE 自动补全
  • 清晰的数据结构文档

2. Optional 类型

python
from typing import Optional

# 表示可选字段
grade: Optional[int]  # 可以是 int 或 None

# 等价于
grade: int | None  # Python 3.10+

3. Annotated 元数据

python
from typing import Annotated

# 基本用法
Age = Annotated[int, "must be positive"]

# LangGraph 用法
from operator import add
Logs = Annotated[List[str], add]  # 指定合并方式

4. 列表推导式

python
# 基本形式
result = [expression for item in iterable if condition]

# 示例
numbers = [1, 2, 3, 4, 5]
even = [n for n in numbers if n % 2 == 0]  # [2, 4]
squared = [n**2 for n in numbers]  # [1, 4, 9, 16, 25]

5. f-string 格式化

python
name = "Alice"
age = 30

# 基本用法
msg = f"Name: {name}, Age: {age}"

# 表达式
msg = f"Next year: {age + 1}"

# 格式控制
pi = 3.14159
msg = f"Pi: {pi:.2f}"  # "Pi: 3.14"

6. operator 模块

python
from operator import add

# 列表合并
add([1, 2], [3, 4])  # [1, 2, 3, 4]

# 数字相加
add(10, 20)  # 30

# 其他常用操作符
from operator import mul, sub
mul(3, 4)  # 12
sub(10, 3)  # 7

🎯 实战建议

对于初学者

  1. 先理解状态通信

    • 画出主图和子图的状态结构
    • 标注哪些字段是共享的
  2. 从简单开始

    • 先构建一个子图
    • 测试通过后再添加第二个
    • 最后处理并行和合并
  3. 使用可视化

    python
    # 查看子图结构
    fa_graph.get_graph().draw_mermaid_png()
    
    # 查看主图结构(展开子图)
    main_graph.get_graph(xray=1).draw_mermaid_png()
  4. 调试技巧

    • 先单独测试每个子图
    • 确保子图能独立运行
    • 再集成到主图中

对于进阶使用

  1. 优化状态设计

    • 只在 output_schema 中包含必要字段
    • 使用有意义的字段名避免冲突
    • 合理使用 Reducer
  2. 错误处理

    python
    def safe_node(state):
        try:
            return process(state)
        except Exception as e:
            return {"error": str(e)}
  3. 性能优化

    • 利用并行执行提高效率
    • 避免传递大量不必要的数据
    • 使用 xray 参数分析执行路径
  4. 可测试性

    python
    # 单独测试子图
    fa_graph = fa_builder.compile()
    result = fa_graph.invoke({"cleaned_logs": test_logs})
    
    # 测试主图
    main_graph = entry_builder.compile()
    result = main_graph.invoke({"raw_logs": test_logs})

📌 总结

核心概念

  1. 子图是独立的图

    • 有自己的状态空间
    • 可以独立运行和测试
    • 通过 compile() 后可嵌入其他图
  2. 状态通信通过同名字段

    • 主图 → 子图:自动传递同名输入字段
    • 子图 → 主图:返回 output_schema 中的字段
  3. 并行子图需要处理结果合并

    • 使用 Annotated + Reducer
    • 或使用不同的字段名

设计原则

单一职责:每个子图负责一个独立任务 ✅ 状态隔离:中间变量不暴露给主图 ✅ 清晰接口:明确定义输入和输出状态 ✅ 可复用性:子图可在多个场景中使用

适用场景

  • 多代理系统(每个代理是一个子图)
  • 复杂业务流程的模块化
  • 需要并行执行的独立任务
  • 需要状态隔离的场景

🚀 扩展练习

  1. 添加第三个子图

    • 创建一个"性能监控"子图
    • 分析日志的响应时间
    • 并行运行三个子图
  2. 实现错误处理

    • 在子图中添加错误捕获
    • 将错误信息返回给主图
    • 主图根据错误决定是否继续
  3. 嵌套子图

    • 在子图内部再嵌套子图
    • 理解多层状态传递机制
  4. 动态子图选择

    • 根据输入数据决定调用哪些子图
    • 实现条件并行

最后的话:子图是 LangGraph 实现模块化和可扩展性的关键机制。虽然这个案例很简单,但它展示的"状态隔离"和"并行合并"思想在构建复杂的多代理系统时至关重要。掌握子图,你就掌握了构建大型 AI 系统的基础!


🧠 核心概念深度解读

"独立的状态空间"通俗解释

场景类比:公司办公室

想象一个公司有三个办公室:

  1. 前台办公室(主图)

    • 负责接待客户、分配任务
    • 有一个公共白板记录:客户信息、任务进度、最终结果
  2. 技术部办公室(子图 A)

    • 有自己的小白板:记录代码 bug、修复方案、技术文档
    • 这些中间过程前台看不到也不关心
    • 完成后只告诉前台:"修好了,这是修复报告"
  3. 市场部办公室(子图 B)

    • 有自己的小白板:记录调研数据、分析草稿、竞品资料
    • 这些中间过程前台看不到也不关心
    • 完成后只告诉前台:"搞定了,这是市场报告"

独立的状态空间 = 每个办公室有自己的小白板

python
# 主图状态(前台的公共白板)
class MainState:
    客户需求: str           # 前台记录
    技术修复报告: str       # 技术部提交的
    市场分析报告: str       # 市场部提交的

# 技术部子图状态(技术部的小白板)
class TechSubGraphState:
    客户需求: str           # 从前台复制过来的
    bug列表: List[str]     # 只在技术部用,前台不知道
    修复代码: str          # 只在技术部用,前台不知道
    技术修复报告: str       # 要提交给前台的

# 市场部子图状态(市场部的小白板)
class MarketSubGraphState:
    客户需求: str           # 从前台复制过来的
    调研数据: List[dict]   # 只在市场部用,前台不知道
    竞品分析: str          # 只在市场部用,前台不知道
    市场分析报告: str       # 要提交给前台的

为什么需要独立空间?

没有独立空间的问题

python
# 如果都用同一个白板(共享状态)
全公司共享状态 = {
    "客户需求": "...",
    "bug列表": [...],        # 技术部的
    "修复代码": "...",       # 技术部的
    "调研数据": [...],       # 市场部的
    "竞品分析": "...",       # 市场部的
    # 全混在一起,乱套了!
}

问题:

  • 前台不关心技术细节,却要看到 bug列表修复代码
  • 技术部不关心市场数据,却要看到 调研数据竞品分析
  • 字段名可能冲突(两个部门都想用 临时笔记 这个名字)
  • 状态臃肿,管理困难

有独立空间的好处

python
# 技术部只关心自己的事
tech_state = {
    "客户需求": "...",      # 输入
    "bug列表": [...],       # 内部工作
    "修复代码": "...",      # 内部工作
    "技术修复报告": "..."   # 输出
}

# 市场部只关心自己的事
market_state = {
    "客户需求": "...",      # 输入
    "调研数据": [...],      # 内部工作
    "竞品分析": "...",      # 内部工作
    "市场分析报告": "..."   # 输出
}

# 前台只看最终结果
main_state = {
    "客户需求": "...",
    "技术修复报告": "...",  # 来自技术部
    "市场分析报告": "..."   # 来自市场部
}

好处:

  • 职责清晰:每个部门管好自己的事
  • 避免污染:中间变量不会泄露到其他地方
  • 可并行:技术部和市场部可以同时工作,互不干扰
  • 易维护:修改技术部流程不影响市场部

LangGraph 的 State 核心概念(大白话版)

State 是什么?

State 就是一个数据流水本,记录了从开始到现在所有节点产生的数据。

核心比喻:State = 信息广场

想象一个城市的中心广场,所有人(节点)都会来这里:

  • 📢 发布消息:把自己处理的结果贴到广场的公告板上
  • 👀 查看消息:看看别人贴了什么信息,作为自己工作的输入
  • 🔄 信息汇总:所有人的工作成果都汇集在这个广场
         信息广场(State)
    ┌────────────────────────┐
    │  用户输入: "..."        │  ← 初始信息
    │  节点A的结果: "..."     │  ← 节点A 贴上去的
    │  节点B的结果: "..."     │  ← 节点B 贴上去的
    │  节点C的结果: "..."     │  ← 节点C 贴上去的
    │  最终答案: "..."        │  ← 最后一个节点总结的
    └────────────────────────┘
           ↑    ↑    ↑    ↑
           │    │    │    │
        节点A 节点B 节点C 节点D
      (来贴)(来看)(来贴)(来看)

关键特点

  1. 中心化汇聚

    • 所有节点产生的信息都汇总到 State 这个广场
    • 不是点对点传递,而是"我贴公告 → 你来看公告"
  2. 信息持久化

    • 贴在广场上的信息不会消失(除非被覆盖)
    • 后面的节点能看到前面所有节点的信息
  3. 异步协作

    • 节点A 不需要等节点B 看完才继续
    • 各自把信息贴上去就行,有依赖关系的节点自然会来看
  4. 全局可见

    • 任何节点都能看到广场上的所有信息
    • 但实际上,节点只关心自己需要的那部分

信息广场的运作机制

场景:多人协作写报告

python
# 广场上的公告板(State)
class 报告State(TypedDict):
    老板需求: str          # 老板发布的
    市场数据: dict         # 市场部贴的
    技术方案: str          # 技术部贴的
    财务预算: float        # 财务部贴的
    最终报告: str          # 汇总人员生成的

执行流程

1. 老板来广场,贴了个需求:
   State = {"老板需求": "做个新产品分析"}

2. 市场部来广场看需求,做完调研后贴结果:
   State = {
       "老板需求": "做个新产品分析",
       "市场数据": {"用户量": 1000, "竞品": ["A", "B"]}  ← 新增
   }

3. 技术部来广场看需求和市场数据,贴技术方案:
   State = {
       "老板需求": "做个新产品分析",
       "市场数据": {...},
       "技术方案": "使用 Python + React"  ← 新增
   }

4. 财务部来广场看技术方案,贴预算:
   State = {
       "老板需求": "做个新产品分析",
       "市场数据": {...},
       "技术方案": "使用 Python + React",
       "财务预算": 100000.0  ← 新增
   }

5. 汇总人员来广场看所有信息,生成最终报告:
   State = {
       "老板需求": "做个新产品分析",
       "市场数据": {...},
       "技术方案": "使用 Python + React",
       "财务预算": 100000.0,
       "最终报告": "综合市场、技术、财务的完整方案"  ← 新增
   }

注意

  • 每个部门(节点)都是独立工作
  • 他们通过**广场(State)**协作,不需要直接沟通
  • 广场上的信息越来越丰富,最终汇聚成完整结果

信息广场 vs 传统函数调用

传统方式(点对点传递)

python
# 像接力赛,信息一个人传一个人
需求 = 老板.发布需求()
市场数据 = 市场部.调研(需求)           # 只能看到需求
技术方案 = 技术部.设计(需求, 市场数据)  # 只能看到需求和市场数据
预算 = 财务部.计算(技术方案)           # 只能看到技术方案
报告 = 汇总.生成(需求, 市场数据, 技术方案, 预算)  # 要手动传所有参数!

问题:

  • 后面的函数要知道前面所有函数的参数
  • 改一个地方,所有函数调用都要改
  • 信息只能单向流动,不能回头看

State 方式(信息广场)

python
# 每个节点只负责:1. 看广场  2. 贴公告
def 市场部(state):
    需求 = state["老板需求"]  # 从广场看
    市场数据 = 做调研(需求)
    return {"市场数据": 市场数据}  # 贴到广场

def 技术部(state):
    需求 = state["老板需求"]      # 从广场看
    市场 = state["市场数据"]      # 从广场看
    方案 = 设计方案(需求, 市场)
    return {"技术方案": 方案}  # 贴到广场

def 财务部(state):
    方案 = state["技术方案"]  # 从广场看
    预算 = 计算预算(方案)
    return {"财务预算": 预算}  # 贴到广场

def 汇总(state):
    # 从广场一次性看到所有信息!
    所有信息 = state
    报告 = 生成报告(所有信息)
    return {"最终报告": 报告}  # 贴到广场

优势:

  • ✅ 每个节点只关心广场(state),不需要知道其他节点
  • ✅ 添加新节点不影响已有节点
  • ✅ 信息全局可见,任何节点都能回头看之前的数据
  • ✅ 解耦合,易扩展

信息广场的"分区管理"(子图的状态隔离)

当广场太大、信息太杂时,可以设立分区广场(子图)。

         总广场(主图 State)
    ┌────────────────────────┐
    │ 用户需求: "..."         │
    │ 市场报告: "..."  ←──────┼─── 来自市场分区
    │ 技术报告: "..."  ←──────┼─── 来自技术分区
    │ 最终方案: "..."         │
    └────────────────────────┘
              ↓          ↓
     ┌───────────┐  ┌───────────┐
     │市场分区广场│  │技术分区广场│
     │───────────│  │───────────│
     │调研数据: ..│  │代码设计: ..│  ← 这些细节
     │竞品分析: ..│  │架构图: ...│  ← 总广场看不到
     │用户访谈: ..│  │技术选型: ..│  ← 只在分区内
     │市场报告: ..│  │技术报告: ..│  ← 最终结果贴到总广场
     └───────────┘  └───────────┘

比喻

  • 总广场(主图 State):市政府大广场,只看各部门的最终汇报
  • 分区广场(子图 State):各部门内部的小广场,处理细节工作

好处

  1. 信息分层:总广场不会被细节淹没
  2. 并行工作:各分区可以同时工作,互不干扰
  3. 隐私保护:分区内部的工作细节不会暴露到总广场

生活类比:做菜的菜谱本

想象你在做一道复杂的菜:

python
# 菜谱状态(State)
class 做菜State:
    食材: List[str]        # 初始输入
    切好的菜: List[str]    # 第一步产生
    调好的酱料: str        # 第二步产生
    炒好的半成品: str      # 第三步产生
    最终成品: str          # 最后一步产生

执行流程:

开始 → 洗菜切菜 → 调酱料 → 炒菜 → 装盘 → 结束
      ↓        ↓       ↓      ↓
    更新State 更新State 更新State 更新State

每个节点(步骤)做两件事:

  1. 读取 State 中的数据(看菜谱本,知道上一步做了什么)
  2. 更新 State 中的数据(在菜谱本上记录这一步的结果)

State 的三大特性

1. 累积性(Accumulative)

State 像滚雪球一样,越滚越大,每个节点都往上加东西。

python
# 初始状态
state = {"食材": ["鸡蛋", "番茄"]}

# 节点1:洗菜切菜
def 切菜(state):
    return {"切好的菜": ["番茄块", "鸡蛋液"]}
# 现在 state = {
#     "食材": ["鸡蛋", "番茄"],
#     "切好的菜": ["番茄块", "鸡蛋液"]  ← 新增
# }

# 节点2:调酱料
def 调酱料(state):
    return {"调好的酱料": "盐+糖+酱油"}
# 现在 state = {
#     "食材": ["鸡蛋", "番茄"],
#     "切好的菜": ["番茄块", "鸡蛋液"],
#     "调好的酱料": "盐+糖+酱油"  ← 又新增
# }

# 节点3:炒菜
def 炒菜(state):
= state["切好的菜"]      # 读取之前的数据
    酱料 = state["调好的酱料"]  # 读取之前的数据
    return {"最终成品": f"炒{} with {酱料}"}
# 现在 state = {
#     "食材": ["鸡蛋", "番茄"],
#     "切好的菜": ["番茄块", "鸡蛋液"],
#     "调好的酱料": "盐+糖+酱油",
#     "最终成品": "炒番茄鸡蛋 with 调料"  ← 继续新增
# }

关键点:每个节点不会删除之前的数据,只会添加新数据。


2. 可覆盖性(Overwritable)

如果节点返回了同名字段,会覆盖旧值。

python
# 初始状态
state = {"温度": "常温"}

# 节点1:加热
def 加热(state):
    return {"温度": "100度"}
# state = {"温度": "100度"}  ← 覆盖了"常温"

# 节点2:冷却
def 冷却(state):
    return {"温度": "50度"}
# state = {"温度": "50度"}  ← 又覆盖了"100度"

默认行为:后面的节点会覆盖前面节点的同名字段。

自定义行为:使用 Reducer 改变合并方式

python
from operator import add
from typing import Annotated

class State(TypedDict):
    # 默认行为:覆盖
    温度: str

    # 自定义行为:累加
    日志: Annotated[List[str], add]

# 使用
state = {"日志": ["步骤1"]}

# 节点1
def 节点1(state):
    return {"日志": ["步骤2"]}
# state = {"日志": ["步骤1", "步骤2"]}  ← 合并,不是覆盖!

# 节点2
def 节点2(state):
    return {"日志": ["步骤3"]}
# state = {"日志": ["步骤1", "步骤2", "步骤3"]}  ← 继续合并

3. 节点间通信的唯一方式

在 LangGraph 中,节点之间不能直接对话,只能通过 State 交流。

错误方式(节点想直接调用):

python
def 节点A(state):
    result = 节点B(state)  # 错误!不能这样
    return {"data": result}

正确方式(通过 State 通信):

python
# 节点A 写入 State
def 节点A(state):
    return {"中间结果": "A的数据"}

# 节点B 从 State 读取
def 节点B(state):
    a_data = state["中间结果"]  # 读取A写入的数据
    return {"最终结果": f"处理了{a_data}"}

# 在图中连接
graph.add_edge("节点A", "节点B")

类比

  • State = 传纸条
  • 节点A 在纸条上写 "中间结果 = XXX"
  • 节点B 看纸条,知道了 A 的结果

State 的设计原则

原则 1:只包含必要的字段

不好的设计(太多无用字段):

python
class State(TypedDict):
    用户输入: str
    临时变量1: str     # 只有一个节点用
    临时变量2: int     # 只有一个节点用
    临时变量3: list    # 只有一个节点用
    调试信息: str      # 开发时用,生产不需要
    最终结果: str

好的设计(只保留必要的):

python
class State(TypedDict):
    用户输入: str      # 输入
    最终结果: str      # 输出
    # 临时变量在函数内部定义,不放 State

技巧:如果某个数据只在一个节点内部使用,就用局部变量,不要放 State。


原则 2:用子图隔离复杂状态

当状态变得复杂时,用子图拆分。

不好的设计(状态臃肿):

python
class 巨型State(TypedDict):
    # 主流程字段
    用户输入: str
    最终结果: str

    # 子任务A的字段
    A_中间数据1: str
    A_中间数据2: List
    A_结果: str

    # 子任务B的字段
    B_中间数据1: dict
    B_中间数据2: str
    B_结果: str

    # 全混在一起!

好的设计(用子图分离):

python
# 主图状态(只关心最终结果)
class 主State(TypedDict):
    用户输入: str
    A_结果: str        # 来自子图A
    B_结果: str        # 来自子图B
    最终结果: str

# 子图A状态(有自己的中间变量)
class 子图AState(TypedDict):
    用户输入: str      # 输入
    中间数据1: str     # 内部使用
    中间数据2: List    # 内部使用
    A_结果: str        # 输出

# 子图B状态(有自己的中间变量)
class 子图BState(TypedDict):
    用户输入: str      # 输入
    中间数据1: dict    # 内部使用
    中间数据2: str     # 内部使用
    B_结果: str        # 输出

原则 3:明确输入和输出

使用 output_schema 明确子图的输出。

python
# 子图内部状态(完整的工作空间)
class 内部State(TypedDict):
    输入数据: str
    临时数据1: str
    临时数据2: int
    最终结果: str

# 子图输出状态(只输出必要的)
class 输出State(TypedDict):
    最终结果: str      # 只输出这一个

# 构建子图
sub_graph = StateGraph(
    state_schema=内部State,      # 内部用完整状态
    output_schema=输出State       # 输出时只返回最终结果
)

好处

  • 主图不会被子图的临时数据污染
  • 清晰的接口约定
  • 易于测试和维护

State 实战示例:多步骤推理

python
from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END

# 定义状态
class ReasoningState(TypedDict):
    问题: str                  # 输入
    思考步骤: List[str]        # 累积的推理过程
    最终答案: str              # 输出

# 节点1:分析问题
def 分析问题(state):
    问题 = state["问题"]
    分析 = f"分析:{问题}涉及数学计算"
    return {"思考步骤": [分析]}  # 第一个思考步骤

# 节点2:制定方案
def 制定方案(state):
    方案 = "方案:先计算子问题,再汇总"
    return {"思考步骤": [方案]}  # 会和之前的合并(如果用 Reducer)

# 节点3:执行计算
def 执行计算(state):
    步骤 = state["思考步骤"]  # 读取之前的推理
    计算 = "执行:2 + 2 = 4"
    return {
        "思考步骤": [计算],
        "最终答案": "4"
    }

# 构建图
graph = StateGraph(ReasoningState)
graph.add_node("分析", 分析问题)
graph.add_node("方案", 制定方案)
graph.add_node("计算", 执行计算)
graph.add_edge(START, "分析")
graph.add_edge("分析", "方案")
graph.add_edge("方案", "计算")
graph.add_edge("计算", END)

# 运行
result = graph.compile().invoke({"问题": "2 + 2 = ?"})

# 结果
{
    "问题": "2 + 2 = ?",
    "思考步骤": [
        "分析:2 + 2 = ?涉及数学计算",
        "方案:先计算子问题,再汇总",
        "执行:2 + 2 = 4"
    ],
    "最终答案": "4"
}

State 调试技巧

  1. 打印状态变化
python
def 调试节点(state):
    print(f"当前状态:{state}")
    # 你的逻辑
    result = {"新字段": "值"}
    print(f"返回数据:{result}")
    return result
  1. 使用 LangSmith 追踪
python
# 在 LangSmith 中可以看到每个节点后的 State 快照
# 帮助理解数据流动
  1. 单元测试节点
python
# 单独测试节点函数
def test_节点():
    mock_state = {"输入": "测试数据"}
    result = 节点函数(mock_state)
    assert result == {"期望输出": "..."}

总结:State 的本质

  1. State 是数据的流水账

    • 记录从开始到现在的所有数据
    • 每个节点都往上添加新数据
  2. State 是节点间的传纸条

    • 节点通过 State 通信
    • 前一个节点写,后一个节点读
  3. State 需要精心设计

    • 只包含必要字段
    • 复杂状态用子图隔离
    • 用 output_schema 控制输出
  4. State 支持灵活的合并策略

    • 默认覆盖
    • 用 Reducer 自定义(如列表合并)

记住这个公式

LangGraph = 节点(处理逻辑) + 边(执行顺序) + State(数据流动)

State 是三者中最核心的,理解了 State,就理解了 LangGraph 的运作原理!

基于 MIT 许可证发布。内容版权归作者所有。