LangGraph 子图(Sub-graph)详细解读
📋 案例概览
这个案例展示了 LangGraph 中**子图(Sub-graph)**的核心概念和使用方法。虽然代码相对简单,但它揭示了一个非常重要的架构模式:如何在复杂系统中管理不同的状态空间。
业务场景
想象你在运营一个问答系统,需要定期分析用户日志:
- 日志清洗:原始日志需要预处理
- 失败分析:找出回答质量差的记录,生成改进报告
- 问题总结:分析用户最关心的主题,发送到 Slack 通知团队
核心挑战
这两个分析任务(失败分析 & 问题总结)需要:
- 并行执行以提高效率
- 独立的状态空间(各自有不同的中间变量)
- 共享输入数据(都需要访问清洗后的日志)
- 汇总输出结果到主图
子图正是解决这个问题的最佳方案!
🗺️ 整体架构图
用户输入原始日志(raw_logs)
↓
┌─────────┐
│clean_logs│ 清洗日志
└─────────┘
↓ cleaned_logs(共享数据)
┌────┴────┐
↓ ↓
┌──────┐ ┌──────┐
│失败分析│ │问题总结│ (两个独立子图并行运行)
│子图 │ │子图 │
└──────┘ └──────┘
↓ ↓
fa_summary report
└────┬────┘
↓
最终结果汇总
📚 代码分段详解
第一部分:定义数据结构
Log 数据模型
from typing_extensions import TypedDict
from typing import List, Optional
class Log(TypedDict):
id: str # 日志 ID
question: str # 用户问题
docs: Optional[List] # 检索到的文档(可选)
answer: str # 系统回答
grade: Optional[int] # 质量评分(可选)
grader: Optional[str]# 评分器名称(可选)
feedback: Optional[str] # 反馈信息(可选)
Python 知识点:
TypedDict:这是 Python 3.8+ 引入的类型,用于定义字典的结构
python# 普通字典 - 不知道有哪些键 log1 = {"id": "1", "question": "xxx"} # TypedDict - 明确定义了键和类型 log2: Log = {"id": "1", "question": "xxx", ...}
Optional[类型]:表示这个字段可以是指定类型,也可以是
None
pythongrade: Optional[int] # 等价于 grade: int | None
为什么这样设计?
- 并非所有日志都有评分(grade),只有被评估过的才有
- 这种灵活的数据结构适配实际业务场景
第二部分:失败分析子图
状态定义
# 子图内部状态(完整状态)
class FailureAnalysisState(TypedDict):
cleaned_logs: List[Log] # 输入:清洗后的日志
failures: List[Log] # 中间变量:失败的日志
fa_summary: str # 输出:失败分析摘要
processed_logs: List[str] # 输出:处理过的日志 ID
# 子图输出状态(只包含需要返回给主图的字段)
class FailureAnalysisOutputState(TypedDict):
fa_summary: str # 只输出摘要
processed_logs: List[str] # 只输出处理记录
关键概念:为什么需要两个状态?
内部状态(FailureAnalysisState):
- 包含所有中间变量(如
failures
) - 用于子图内部节点之间传递数据
- 包含所有中间变量(如
输出状态(FailureAnalysisOutputState):
- 只包含需要返回给主图的字段
- 避免不必要的数据传递
- 减少状态冲突的可能性
类比:
- 内部状态 = 厨房里的所有工具和原料
- 输出状态 = 端给客人的成品菜肴
节点函数
1. get_failures - 过滤失败日志
def get_failures(state):
""" 获取包含失败记录的日志 """
cleaned_logs = state["cleaned_logs"]
# 筛选出有 grade 字段的日志(表示被评分过)
failures = [log for log in cleaned_logs if "grade" in log]
return {"failures": failures}
Python 知识点 - 列表推导式:
# 传统写法
failures = []
for log in cleaned_logs:
if "grade" in log:
failures.append(log)
# 列表推导式(更简洁)
failures = [log for log in cleaned_logs if "grade" in log]
业务逻辑:
- 只有被评估过的日志才有
grade
字段 - 这些日志通常是因为质量问题需要人工审核的
2. generate_summary - 生成摘要
def generate_summary(state):
""" 生成失败分析摘要 """
failures = state["failures"]
# 实际应用中,这里会调用 LLM 分析失败原因
# fa_summary = summarize_with_llm(failures)
fa_summary = "Poor quality retrieval of Chroma documentation."
# 记录处理过的日志 ID
processed_logs = [
f"failure-analysis-on-log-{failure['id']}"
for failure in failures
]
return {
"fa_summary": fa_summary,
"processed_logs": processed_logs
}
技术细节:
f-string 格式化字符串(Python 3.6+):
pythonfailure_id = "123" # 传统方式 msg = "failure-analysis-on-log-" + failure_id # f-string(更直观) msg = f"failure-analysis-on-log-{failure_id}"
processed_logs 的作用:
- 追踪哪些日志被处理过
- 用于审计和调试
- 主图会收集所有子图的处理记录
构建子图
from langgraph.graph import StateGraph, START, END
fa_builder = StateGraph(
state_schema=FailureAnalysisState, # 内部状态
output_schema=FailureAnalysisOutputState # 输出状态
)
# 添加节点
fa_builder.add_node("get_failures", get_failures)
fa_builder.add_node("generate_summary", generate_summary)
# 添加边(定义执行顺序)
fa_builder.add_edge(START, "get_failures")
fa_builder.add_edge("get_failures", "generate_summary")
fa_builder.add_edge("generate_summary", END)
执行流程:
START → get_failures → generate_summary → END
LangGraph 概念:
START
:特殊节点,表示图的入口END
:特殊节点,表示图的出口add_edge(A, B)
:从节点 A 到节点 B 的有向边
第三部分:问题总结子图
状态定义
# 内部状态
class QuestionSummarizationState(TypedDict):
cleaned_logs: List[Log] # 输入
qs_summary: str # 中间变量:问题摘要
report: str # 输出:最终报告
processed_logs: List[str] # 输出:处理记录
# 输出状态
class QuestionSummarizationOutputState(TypedDict):
report: str # 只输出报告
processed_logs: List[str] # 只输出处理记录
注意:qs_summary
是中间变量,不需要返回给主图
节点函数
1. generate_summary - 生成问题摘要
def generate_summary(state):
cleaned_logs = state["cleaned_logs"]
# 实际会调用 LLM 分析用户问题的主题
summary = "Questions focused on usage of ChatOllama and Chroma vector store."
# 记录处理的日志
processed_logs = [
f"summary-on-log-{log['id']}"
for log in cleaned_logs
]
return {
"qs_summary": summary,
"processed_logs": processed_logs
}
2. send_to_slack - 发送报告
def send_to_slack(state):
qs_summary = state["qs_summary"]
# 实际会基于摘要生成完整报告并发送到 Slack
# report = generate_report(qs_summary)
# send_to_slack_api(report)
report = "foo bar baz"
return {"report": report}
业务流程:
清洗日志 → 生成摘要 → 格式化报告 → 发送到 Slack
构建子图
qs_builder = StateGraph(
QuestionSummarizationState,
output_schema=QuestionSummarizationOutputState
)
qs_builder.add_node("generate_summary", generate_summary)
qs_builder.add_node("send_to_slack", send_to_slack)
qs_builder.add_edge(START, "generate_summary")
qs_builder.add_edge("generate_summary", "send_to_slack")
qs_builder.add_edge("send_to_slack", END)
第四部分:主图 - 整合子图
主图状态
from operator import add
from typing import Annotated
class EntryGraphState(TypedDict):
raw_logs: List[Log] # 输入:原始日志
cleaned_logs: List[Log] # 清洗后的日志
fa_summary: str # 来自失败分析子图
report: str # 来自问题总结子图
processed_logs: Annotated[List[int], add] # 来自两个子图,需要合并
核心技术点 - Annotated 和 Reducer:
processed_logs: Annotated[List[int], add]
这行代码包含了三个重要概念:
Annotated:Python 3.9+ 的类型注解增强工具
pythonfrom typing import Annotated # 基本类型注解 name: str # 带额外元数据的注解 age: Annotated[int, "must be positive"]
Reducer(合并器):
add
来自operator
模块pythonfrom operator import add # add 的作用是合并列表 result = add([1, 2], [3, 4]) # 结果:[1, 2, 3, 4]
为什么需要 Reducer?
因为两个子图并行运行,都会返回
processed_logs
:python# 失败分析子图返回 {"processed_logs": ["failure-analysis-on-log-2"]} # 问题总结子图返回 {"processed_logs": ["summary-on-log-1", "summary-on-log-2"]} # LangGraph 需要知道如何合并这两个结果 # 使用 add 后: {"processed_logs": [ "failure-analysis-on-log-2", "summary-on-log-1", "summary-on-log-2" ]}
常见的 Reducer:
from operator import add
# 列表合并
Annotated[List[str], add] # [1, 2] + [3] = [1, 2, 3]
# 自定义 Reducer
def merge_dicts(a, b):
return {**a, **b}
Annotated[dict, merge_dicts]
为什么 cleaned_logs 不需要 Reducer?
问题:cleaned_logs
也会被两个子图访问,为什么不需要 Annotated
?
答案:因为我们定义了 output_schema
!
# 失败分析子图的输出状态(不包含 cleaned_logs)
class FailureAnalysisOutputState(TypedDict):
fa_summary: str
processed_logs: List[str]
# 注意:没有 cleaned_logs
# 问题总结子图的输出状态(也不包含 cleaned_logs)
class QuestionSummarizationOutputState(TypedDict):
report: str
processed_logs: List[str]
# 注意:也没有 cleaned_logs
机制解释:
没有 output_schema 时:
pythonStateGraph(MyState) # 所有状态字段都会输出
- 子图会返回所有状态字段
cleaned_logs
会被两个子图都返回- 需要 Reducer 来合并
有 output_schema 时:
pythonStateGraph(MyState, output_schema=MyOutputState)
- 子图只返回
MyOutputState
中定义的字段 cleaned_logs
不在输出状态中,不会返回- 不需要 Reducer
- 子图只返回
类比:
- 没有 output_schema = 函数返回所有局部变量
- 有 output_schema = 函数只返回你指定的值
主图节点和构建
clean_logs 节点:
def clean_logs(state):
""" 清洗日志 """
raw_logs = state["raw_logs"]
# 实际应用中会进行数据清洗:
# - 去重
# - 格式标准化
# - 过滤无效数据
cleaned_logs = raw_logs # 简化示例
return {"cleaned_logs": cleaned_logs}
构建主图:
entry_builder = StateGraph(EntryGraphState)
# 添加常规节点
entry_builder.add_node("clean_logs", clean_logs)
# 关键:将编译后的子图作为节点添加
entry_builder.add_node(
"question_summarization",
qs_builder.compile() # 编译子图
)
entry_builder.add_node(
"failure_analysis",
fa_builder.compile() # 编译子图
)
# 定义执行流程
entry_builder.add_edge(START, "clean_logs")
entry_builder.add_edge("clean_logs", "failure_analysis")
entry_builder.add_edge("clean_logs", "question_summarization")
entry_builder.add_edge("failure_analysis", END)
entry_builder.add_edge("question_summarization", END)
# 编译主图
graph = entry_builder.compile()
关键点:
子图也是节点:
pythonentry_builder.add_node("子图名称", 子图实例.compile())
并行执行:
pythonentry_builder.add_edge("clean_logs", "failure_analysis") entry_builder.add_edge("clean_logs", "question_summarization")
从
clean_logs
出发有两条边,两个子图会并行运行xray 参数:
pythongraph.get_graph(xray=1).draw_mermaid_png()
xray=1
会展开子图的内部结构,方便调试
第五部分:运行示例
准备测试数据
# 正常的问答日志
question_answer = Log(
id="1",
question="How can I import ChatOllama?",
answer="To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'",
# 没有 grade 字段,表示质量正常
)
# 有质量问题的日志
question_answer_feedback = Log(
id="2",
question="How can I use Chroma vector store?",
answer="To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).",
grade=0, # 评分为 0,表示质量差
grader="Document Relevance Recall",
feedback="The retrieved documents discuss vector stores in general, but not Chroma specifically",
)
raw_logs = [question_answer, question_answer_feedback]
执行图
result = graph.invoke({"raw_logs": raw_logs})
输出结果:
{
'raw_logs': [...], # 原始输入
'cleaned_logs': [...], # clean_logs 节点的输出
'fa_summary': 'Poor quality retrieval of Chroma documentation.',
# ↑ 来自失败分析子图
'report': 'foo bar baz',
# ↑ 来自问题总结子图
'processed_logs': [
'failure-analysis-on-log-2', # 失败分析处理了 log-2
'summary-on-log-1', # 问题总结处理了 log-1
'summary-on-log-2' # 问题总结处理了 log-2
]
# ↑ 两个子图的 processed_logs 通过 add 合并
}
🔑 核心技术解析
1. 子图的本质
子图是什么?
- 子图本身就是一个完整的 StateGraph
- 可以独立运行和测试
- 通过
compile()
编译后,可以作为节点嵌入其他图
类比:
- 主图 = 一个公司
- 子图 = 公司内的部门
- 每个部门有自己的工作流程(内部状态)
- 部门只向公司汇报关键结果(输出状态)
2. 状态通信机制
主图 → 子图(输入):
class EntryGraphState(TypedDict):
cleaned_logs: List[Log] # 主图有这个字段
class FailureAnalysisState(TypedDict):
cleaned_logs: List[Log] # 子图也有这个字段
当主图调用子图时,LangGraph 会自动传递同名字段:
# 主图状态
{"cleaned_logs": [...]}
# 自动传递给子图
# 子图可以访问 state["cleaned_logs"]
子图 → 主图(输出):
class FailureAnalysisOutputState(TypedDict):
fa_summary: str # 子图输出这个字段
class EntryGraphState(TypedDict):
fa_summary: str # 主图有同名字段接收
子图返回的字段会更新主图状态中的同名字段。
关键原则:
✅ 同名字段 = 通信通道
- 主图和子图通过同名字段传递数据
- 不同名的字段互不干扰
✅ 输入 = 状态交集
- 子图只能访问与主图同名的输入字段
✅ 输出 = output_schema
- 子图只返回 output_schema 中定义的字段
- 减少不必要的数据传递
3. 并行子图的结果合并
场景:两个子图并行运行,都返回同名字段
# 子图 A 返回
{"processed_logs": ["a", "b"]}
# 子图 B 返回
{"processed_logs": ["c", "d"]}
# 如何合并?
解决方案 1:使用 Reducer
from operator import add
class State(TypedDict):
processed_logs: Annotated[List[str], add] # 指定合并方式
结果:{"processed_logs": ["a", "b", "c", "d"]}
解决方案 2:使用不同的字段名
class EntryState(TypedDict):
fa_processed_logs: List[str] # 失败分析的记录
qs_processed_logs: List[str] # 问题总结的记录
class FAOutputState(TypedDict):
fa_processed_logs: List[str] # 不同名
class QSOutputState(TypedDict):
qs_processed_logs: List[str] # 不同名
这样两个子图返回不同的字段,不需要合并。
4. 何时使用子图?
适合使用子图的场景:
✅ 独立的业务逻辑
- 每个子图负责一个完整的子任务
- 有清晰的输入和输出边界
✅ 需要状态隔离
- 子图的中间变量不应该污染主图状态
- 例如:
failures
只在失败分析子图内使用
✅ 可复用的流程
- 同一个子图可以在不同的主图中复用
- 例如:日志清洗子图可用于多个场景
✅ 并行执行
- 多个独立任务需要同时运行
- 通过 Reducer 合并结果
不适合使用子图的场景:
❌ 简单的线性流程
- 如果只是几个顺序执行的节点,不需要子图
❌ 状态高度耦合
- 如果节点之间需要频繁共享中间状态
❌ 过度设计
- 不要为了用子图而用子图
💡 Python 知识点总结
1. TypedDict
from typing_extensions import TypedDict
# 定义字典结构
class Person(TypedDict):
name: str
age: int
# 使用
person: Person = {"name": "Alice", "age": 30}
优势:
- 提供类型检查
- IDE 自动补全
- 清晰的数据结构文档
2. Optional 类型
from typing import Optional
# 表示可选字段
grade: Optional[int] # 可以是 int 或 None
# 等价于
grade: int | None # Python 3.10+
3. Annotated 元数据
from typing import Annotated
# 基本用法
Age = Annotated[int, "must be positive"]
# LangGraph 用法
from operator import add
Logs = Annotated[List[str], add] # 指定合并方式
4. 列表推导式
# 基本形式
result = [expression for item in iterable if condition]
# 示例
numbers = [1, 2, 3, 4, 5]
even = [n for n in numbers if n % 2 == 0] # [2, 4]
squared = [n**2 for n in numbers] # [1, 4, 9, 16, 25]
5. f-string 格式化
name = "Alice"
age = 30
# 基本用法
msg = f"Name: {name}, Age: {age}"
# 表达式
msg = f"Next year: {age + 1}"
# 格式控制
pi = 3.14159
msg = f"Pi: {pi:.2f}" # "Pi: 3.14"
6. operator 模块
from operator import add
# 列表合并
add([1, 2], [3, 4]) # [1, 2, 3, 4]
# 数字相加
add(10, 20) # 30
# 其他常用操作符
from operator import mul, sub
mul(3, 4) # 12
sub(10, 3) # 7
🎯 实战建议
对于初学者
先理解状态通信
- 画出主图和子图的状态结构
- 标注哪些字段是共享的
从简单开始
- 先构建一个子图
- 测试通过后再添加第二个
- 最后处理并行和合并
使用可视化
python# 查看子图结构 fa_graph.get_graph().draw_mermaid_png() # 查看主图结构(展开子图) main_graph.get_graph(xray=1).draw_mermaid_png()
调试技巧
- 先单独测试每个子图
- 确保子图能独立运行
- 再集成到主图中
对于进阶使用
优化状态设计
- 只在 output_schema 中包含必要字段
- 使用有意义的字段名避免冲突
- 合理使用 Reducer
错误处理
pythondef safe_node(state): try: return process(state) except Exception as e: return {"error": str(e)}
性能优化
- 利用并行执行提高效率
- 避免传递大量不必要的数据
- 使用 xray 参数分析执行路径
可测试性
python# 单独测试子图 fa_graph = fa_builder.compile() result = fa_graph.invoke({"cleaned_logs": test_logs}) # 测试主图 main_graph = entry_builder.compile() result = main_graph.invoke({"raw_logs": test_logs})
📌 总结
核心概念
子图是独立的图
- 有自己的状态空间
- 可以独立运行和测试
- 通过 compile() 后可嵌入其他图
状态通信通过同名字段
- 主图 → 子图:自动传递同名输入字段
- 子图 → 主图:返回 output_schema 中的字段
并行子图需要处理结果合并
- 使用 Annotated + Reducer
- 或使用不同的字段名
设计原则
✅ 单一职责:每个子图负责一个独立任务 ✅ 状态隔离:中间变量不暴露给主图 ✅ 清晰接口:明确定义输入和输出状态 ✅ 可复用性:子图可在多个场景中使用
适用场景
- 多代理系统(每个代理是一个子图)
- 复杂业务流程的模块化
- 需要并行执行的独立任务
- 需要状态隔离的场景
🚀 扩展练习
添加第三个子图
- 创建一个"性能监控"子图
- 分析日志的响应时间
- 并行运行三个子图
实现错误处理
- 在子图中添加错误捕获
- 将错误信息返回给主图
- 主图根据错误决定是否继续
嵌套子图
- 在子图内部再嵌套子图
- 理解多层状态传递机制
动态子图选择
- 根据输入数据决定调用哪些子图
- 实现条件并行
最后的话:子图是 LangGraph 实现模块化和可扩展性的关键机制。虽然这个案例很简单,但它展示的"状态隔离"和"并行合并"思想在构建复杂的多代理系统时至关重要。掌握子图,你就掌握了构建大型 AI 系统的基础!
🧠 核心概念深度解读
"独立的状态空间"通俗解释
场景类比:公司办公室
想象一个公司有三个办公室:
前台办公室(主图)
- 负责接待客户、分配任务
- 有一个公共白板记录:客户信息、任务进度、最终结果
技术部办公室(子图 A)
- 有自己的小白板:记录代码 bug、修复方案、技术文档
- 这些中间过程前台看不到也不关心
- 完成后只告诉前台:"修好了,这是修复报告"
市场部办公室(子图 B)
- 有自己的小白板:记录调研数据、分析草稿、竞品资料
- 这些中间过程前台看不到也不关心
- 完成后只告诉前台:"搞定了,这是市场报告"
独立的状态空间 = 每个办公室有自己的小白板
# 主图状态(前台的公共白板)
class MainState:
客户需求: str # 前台记录
技术修复报告: str # 技术部提交的
市场分析报告: str # 市场部提交的
# 技术部子图状态(技术部的小白板)
class TechSubGraphState:
客户需求: str # 从前台复制过来的
bug列表: List[str] # 只在技术部用,前台不知道
修复代码: str # 只在技术部用,前台不知道
技术修复报告: str # 要提交给前台的
# 市场部子图状态(市场部的小白板)
class MarketSubGraphState:
客户需求: str # 从前台复制过来的
调研数据: List[dict] # 只在市场部用,前台不知道
竞品分析: str # 只在市场部用,前台不知道
市场分析报告: str # 要提交给前台的
为什么需要独立空间?
❌ 没有独立空间的问题:
# 如果都用同一个白板(共享状态)
全公司共享状态 = {
"客户需求": "...",
"bug列表": [...], # 技术部的
"修复代码": "...", # 技术部的
"调研数据": [...], # 市场部的
"竞品分析": "...", # 市场部的
# 全混在一起,乱套了!
}
问题:
- 前台不关心技术细节,却要看到
bug列表
、修复代码
- 技术部不关心市场数据,却要看到
调研数据
、竞品分析
- 字段名可能冲突(两个部门都想用
临时笔记
这个名字) - 状态臃肿,管理困难
✅ 有独立空间的好处:
# 技术部只关心自己的事
tech_state = {
"客户需求": "...", # 输入
"bug列表": [...], # 内部工作
"修复代码": "...", # 内部工作
"技术修复报告": "..." # 输出
}
# 市场部只关心自己的事
market_state = {
"客户需求": "...", # 输入
"调研数据": [...], # 内部工作
"竞品分析": "...", # 内部工作
"市场分析报告": "..." # 输出
}
# 前台只看最终结果
main_state = {
"客户需求": "...",
"技术修复报告": "...", # 来自技术部
"市场分析报告": "..." # 来自市场部
}
好处:
- ✅ 职责清晰:每个部门管好自己的事
- ✅ 避免污染:中间变量不会泄露到其他地方
- ✅ 可并行:技术部和市场部可以同时工作,互不干扰
- ✅ 易维护:修改技术部流程不影响市场部
LangGraph 的 State 核心概念(大白话版)
State 是什么?
State 就是一个数据流水本,记录了从开始到现在所有节点产生的数据。
核心比喻:State = 信息广场
想象一个城市的中心广场,所有人(节点)都会来这里:
- 📢 发布消息:把自己处理的结果贴到广场的公告板上
- 👀 查看消息:看看别人贴了什么信息,作为自己工作的输入
- 🔄 信息汇总:所有人的工作成果都汇集在这个广场
信息广场(State)
┌────────────────────────┐
│ 用户输入: "..." │ ← 初始信息
│ 节点A的结果: "..." │ ← 节点A 贴上去的
│ 节点B的结果: "..." │ ← 节点B 贴上去的
│ 节点C的结果: "..." │ ← 节点C 贴上去的
│ 最终答案: "..." │ ← 最后一个节点总结的
└────────────────────────┘
↑ ↑ ↑ ↑
│ │ │ │
节点A 节点B 节点C 节点D
(来贴)(来看)(来贴)(来看)
关键特点:
中心化汇聚
- 所有节点产生的信息都汇总到 State 这个广场
- 不是点对点传递,而是"我贴公告 → 你来看公告"
信息持久化
- 贴在广场上的信息不会消失(除非被覆盖)
- 后面的节点能看到前面所有节点的信息
异步协作
- 节点A 不需要等节点B 看完才继续
- 各自把信息贴上去就行,有依赖关系的节点自然会来看
全局可见
- 任何节点都能看到广场上的所有信息
- 但实际上,节点只关心自己需要的那部分
信息广场的运作机制
场景:多人协作写报告
# 广场上的公告板(State)
class 报告State(TypedDict):
老板需求: str # 老板发布的
市场数据: dict # 市场部贴的
技术方案: str # 技术部贴的
财务预算: float # 财务部贴的
最终报告: str # 汇总人员生成的
执行流程:
1. 老板来广场,贴了个需求:
State = {"老板需求": "做个新产品分析"}
2. 市场部来广场看需求,做完调研后贴结果:
State = {
"老板需求": "做个新产品分析",
"市场数据": {"用户量": 1000, "竞品": ["A", "B"]} ← 新增
}
3. 技术部来广场看需求和市场数据,贴技术方案:
State = {
"老板需求": "做个新产品分析",
"市场数据": {...},
"技术方案": "使用 Python + React" ← 新增
}
4. 财务部来广场看技术方案,贴预算:
State = {
"老板需求": "做个新产品分析",
"市场数据": {...},
"技术方案": "使用 Python + React",
"财务预算": 100000.0 ← 新增
}
5. 汇总人员来广场看所有信息,生成最终报告:
State = {
"老板需求": "做个新产品分析",
"市场数据": {...},
"技术方案": "使用 Python + React",
"财务预算": 100000.0,
"最终报告": "综合市场、技术、财务的完整方案" ← 新增
}
注意:
- 每个部门(节点)都是独立工作的
- 他们通过**广场(State)**协作,不需要直接沟通
- 广场上的信息越来越丰富,最终汇聚成完整结果
信息广场 vs 传统函数调用
❌ 传统方式(点对点传递):
# 像接力赛,信息一个人传一个人
需求 = 老板.发布需求()
市场数据 = 市场部.调研(需求) # 只能看到需求
技术方案 = 技术部.设计(需求, 市场数据) # 只能看到需求和市场数据
预算 = 财务部.计算(技术方案) # 只能看到技术方案
报告 = 汇总.生成(需求, 市场数据, 技术方案, 预算) # 要手动传所有参数!
问题:
- 后面的函数要知道前面所有函数的参数
- 改一个地方,所有函数调用都要改
- 信息只能单向流动,不能回头看
✅ State 方式(信息广场):
# 每个节点只负责:1. 看广场 2. 贴公告
def 市场部(state):
需求 = state["老板需求"] # 从广场看
市场数据 = 做调研(需求)
return {"市场数据": 市场数据} # 贴到广场
def 技术部(state):
需求 = state["老板需求"] # 从广场看
市场 = state["市场数据"] # 从广场看
方案 = 设计方案(需求, 市场)
return {"技术方案": 方案} # 贴到广场
def 财务部(state):
方案 = state["技术方案"] # 从广场看
预算 = 计算预算(方案)
return {"财务预算": 预算} # 贴到广场
def 汇总(state):
# 从广场一次性看到所有信息!
所有信息 = state
报告 = 生成报告(所有信息)
return {"最终报告": 报告} # 贴到广场
优势:
- ✅ 每个节点只关心广场(state),不需要知道其他节点
- ✅ 添加新节点不影响已有节点
- ✅ 信息全局可见,任何节点都能回头看之前的数据
- ✅ 解耦合,易扩展
信息广场的"分区管理"(子图的状态隔离)
当广场太大、信息太杂时,可以设立分区广场(子图)。
总广场(主图 State)
┌────────────────────────┐
│ 用户需求: "..." │
│ 市场报告: "..." ←──────┼─── 来自市场分区
│ 技术报告: "..." ←──────┼─── 来自技术分区
│ 最终方案: "..." │
└────────────────────────┘
↓ ↓
┌───────────┐ ┌───────────┐
│市场分区广场│ │技术分区广场│
│───────────│ │───────────│
│调研数据: ..│ │代码设计: ..│ ← 这些细节
│竞品分析: ..│ │架构图: ...│ ← 总广场看不到
│用户访谈: ..│ │技术选型: ..│ ← 只在分区内
│市场报告: ..│ │技术报告: ..│ ← 最终结果贴到总广场
└───────────┘ └───────────┘
比喻:
- 总广场(主图 State):市政府大广场,只看各部门的最终汇报
- 分区广场(子图 State):各部门内部的小广场,处理细节工作
好处:
- 信息分层:总广场不会被细节淹没
- 并行工作:各分区可以同时工作,互不干扰
- 隐私保护:分区内部的工作细节不会暴露到总广场
生活类比:做菜的菜谱本
想象你在做一道复杂的菜:
# 菜谱状态(State)
class 做菜State:
食材: List[str] # 初始输入
切好的菜: List[str] # 第一步产生
调好的酱料: str # 第二步产生
炒好的半成品: str # 第三步产生
最终成品: str # 最后一步产生
执行流程:
开始 → 洗菜切菜 → 调酱料 → 炒菜 → 装盘 → 结束
↓ ↓ ↓ ↓
更新State 更新State 更新State 更新State
每个节点(步骤)做两件事:
- 读取 State 中的数据(看菜谱本,知道上一步做了什么)
- 更新 State 中的数据(在菜谱本上记录这一步的结果)
State 的三大特性
1. 累积性(Accumulative)
State 像滚雪球一样,越滚越大,每个节点都往上加东西。
# 初始状态
state = {"食材": ["鸡蛋", "番茄"]}
# 节点1:洗菜切菜
def 切菜(state):
return {"切好的菜": ["番茄块", "鸡蛋液"]}
# 现在 state = {
# "食材": ["鸡蛋", "番茄"],
# "切好的菜": ["番茄块", "鸡蛋液"] ← 新增
# }
# 节点2:调酱料
def 调酱料(state):
return {"调好的酱料": "盐+糖+酱油"}
# 现在 state = {
# "食材": ["鸡蛋", "番茄"],
# "切好的菜": ["番茄块", "鸡蛋液"],
# "调好的酱料": "盐+糖+酱油" ← 又新增
# }
# 节点3:炒菜
def 炒菜(state):
菜 = state["切好的菜"] # 读取之前的数据
酱料 = state["调好的酱料"] # 读取之前的数据
return {"最终成品": f"炒{菜} with {酱料}"}
# 现在 state = {
# "食材": ["鸡蛋", "番茄"],
# "切好的菜": ["番茄块", "鸡蛋液"],
# "调好的酱料": "盐+糖+酱油",
# "最终成品": "炒番茄鸡蛋 with 调料" ← 继续新增
# }
关键点:每个节点不会删除之前的数据,只会添加新数据。
2. 可覆盖性(Overwritable)
如果节点返回了同名字段,会覆盖旧值。
# 初始状态
state = {"温度": "常温"}
# 节点1:加热
def 加热(state):
return {"温度": "100度"}
# state = {"温度": "100度"} ← 覆盖了"常温"
# 节点2:冷却
def 冷却(state):
return {"温度": "50度"}
# state = {"温度": "50度"} ← 又覆盖了"100度"
默认行为:后面的节点会覆盖前面节点的同名字段。
自定义行为:使用 Reducer 改变合并方式
from operator import add
from typing import Annotated
class State(TypedDict):
# 默认行为:覆盖
温度: str
# 自定义行为:累加
日志: Annotated[List[str], add]
# 使用
state = {"日志": ["步骤1"]}
# 节点1
def 节点1(state):
return {"日志": ["步骤2"]}
# state = {"日志": ["步骤1", "步骤2"]} ← 合并,不是覆盖!
# 节点2
def 节点2(state):
return {"日志": ["步骤3"]}
# state = {"日志": ["步骤1", "步骤2", "步骤3"]} ← 继续合并
3. 节点间通信的唯一方式
在 LangGraph 中,节点之间不能直接对话,只能通过 State 交流。
❌ 错误方式(节点想直接调用):
def 节点A(state):
result = 节点B(state) # 错误!不能这样
return {"data": result}
✅ 正确方式(通过 State 通信):
# 节点A 写入 State
def 节点A(state):
return {"中间结果": "A的数据"}
# 节点B 从 State 读取
def 节点B(state):
a_data = state["中间结果"] # 读取A写入的数据
return {"最终结果": f"处理了{a_data}"}
# 在图中连接
graph.add_edge("节点A", "节点B")
类比:
- State = 传纸条
- 节点A 在纸条上写 "中间结果 = XXX"
- 节点B 看纸条,知道了 A 的结果
State 的设计原则
原则 1:只包含必要的字段
❌ 不好的设计(太多无用字段):
class State(TypedDict):
用户输入: str
临时变量1: str # 只有一个节点用
临时变量2: int # 只有一个节点用
临时变量3: list # 只有一个节点用
调试信息: str # 开发时用,生产不需要
最终结果: str
✅ 好的设计(只保留必要的):
class State(TypedDict):
用户输入: str # 输入
最终结果: str # 输出
# 临时变量在函数内部定义,不放 State
技巧:如果某个数据只在一个节点内部使用,就用局部变量,不要放 State。
原则 2:用子图隔离复杂状态
当状态变得复杂时,用子图拆分。
❌ 不好的设计(状态臃肿):
class 巨型State(TypedDict):
# 主流程字段
用户输入: str
最终结果: str
# 子任务A的字段
A_中间数据1: str
A_中间数据2: List
A_结果: str
# 子任务B的字段
B_中间数据1: dict
B_中间数据2: str
B_结果: str
# 全混在一起!
✅ 好的设计(用子图分离):
# 主图状态(只关心最终结果)
class 主State(TypedDict):
用户输入: str
A_结果: str # 来自子图A
B_结果: str # 来自子图B
最终结果: str
# 子图A状态(有自己的中间变量)
class 子图AState(TypedDict):
用户输入: str # 输入
中间数据1: str # 内部使用
中间数据2: List # 内部使用
A_结果: str # 输出
# 子图B状态(有自己的中间变量)
class 子图BState(TypedDict):
用户输入: str # 输入
中间数据1: dict # 内部使用
中间数据2: str # 内部使用
B_结果: str # 输出
原则 3:明确输入和输出
使用 output_schema
明确子图的输出。
# 子图内部状态(完整的工作空间)
class 内部State(TypedDict):
输入数据: str
临时数据1: str
临时数据2: int
最终结果: str
# 子图输出状态(只输出必要的)
class 输出State(TypedDict):
最终结果: str # 只输出这一个
# 构建子图
sub_graph = StateGraph(
state_schema=内部State, # 内部用完整状态
output_schema=输出State # 输出时只返回最终结果
)
好处:
- 主图不会被子图的临时数据污染
- 清晰的接口约定
- 易于测试和维护
State 实战示例:多步骤推理
from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END
# 定义状态
class ReasoningState(TypedDict):
问题: str # 输入
思考步骤: List[str] # 累积的推理过程
最终答案: str # 输出
# 节点1:分析问题
def 分析问题(state):
问题 = state["问题"]
分析 = f"分析:{问题}涉及数学计算"
return {"思考步骤": [分析]} # 第一个思考步骤
# 节点2:制定方案
def 制定方案(state):
方案 = "方案:先计算子问题,再汇总"
return {"思考步骤": [方案]} # 会和之前的合并(如果用 Reducer)
# 节点3:执行计算
def 执行计算(state):
步骤 = state["思考步骤"] # 读取之前的推理
计算 = "执行:2 + 2 = 4"
return {
"思考步骤": [计算],
"最终答案": "4"
}
# 构建图
graph = StateGraph(ReasoningState)
graph.add_node("分析", 分析问题)
graph.add_node("方案", 制定方案)
graph.add_node("计算", 执行计算)
graph.add_edge(START, "分析")
graph.add_edge("分析", "方案")
graph.add_edge("方案", "计算")
graph.add_edge("计算", END)
# 运行
result = graph.compile().invoke({"问题": "2 + 2 = ?"})
# 结果
{
"问题": "2 + 2 = ?",
"思考步骤": [
"分析:2 + 2 = ?涉及数学计算",
"方案:先计算子问题,再汇总",
"执行:2 + 2 = 4"
],
"最终答案": "4"
}
State 调试技巧
- 打印状态变化
def 调试节点(state):
print(f"当前状态:{state}")
# 你的逻辑
result = {"新字段": "值"}
print(f"返回数据:{result}")
return result
- 使用 LangSmith 追踪
# 在 LangSmith 中可以看到每个节点后的 State 快照
# 帮助理解数据流动
- 单元测试节点
# 单独测试节点函数
def test_节点():
mock_state = {"输入": "测试数据"}
result = 节点函数(mock_state)
assert result == {"期望输出": "..."}
总结:State 的本质
State 是数据的流水账
- 记录从开始到现在的所有数据
- 每个节点都往上添加新数据
State 是节点间的传纸条
- 节点通过 State 通信
- 前一个节点写,后一个节点读
State 需要精心设计
- 只包含必要字段
- 复杂状态用子图隔离
- 用 output_schema 控制输出
State 支持灵活的合并策略
- 默认覆盖
- 用 Reducer 自定义(如列表合并)
记住这个公式:
LangGraph = 节点(处理逻辑) + 边(执行顺序) + State(数据流动)
State 是三者中最核心的,理解了 State,就理解了 LangGraph 的运作原理!