高级路由与控制流
在上一节中,我们学习了 LangGraph 的基础概念:State、Node、Edge 和条件边。本节将深入探讨 LangGraph 中更高级的路由和控制流机制,包括:
- 可复用的路由函数:返回布尔值或枚举的路由模式
- Send 动态分发:并行处理多个任务
- Command API:命令式控制流,显式指定下一步
- Subgraph 子图:将复杂流程模块化
这些特性是 LangGraph 1.x 版本的核心能力,掌握它们能让你构建更复杂、更灵活的 AI Agent。
案例一:可复用的路由函数
案例概览
在之前的条件边示例中,路由函数返回的是节点名称字符串。但这样做有一个问题:如果节点名称改变,路由函数也要跟着改。
更好的做法是:路由函数返回布尔值或枚举,再用 mapping 映射到具体节点。这样路由逻辑和节点名称解耦,更易维护和复用。
核心概念:Boolean Routing
def routing_func(state: State) -> bool:
if state["score"] > 50:
return True # 高分
else:
return False # 低分
builder.add_conditional_edges(
START,
routing_func,
{
True: "node_high", # True → 高分节点
False: "node_low" # False → 低分节点
}
)mapping 参数的作用:
- 将路由函数的返回值(True/False)映射到实际节点名称
- 路由逻辑只关心"满足/不满足条件",不关心具体节点叫什么
- 节点重命名时,只需修改 mapping,不需要改路由函数
知识点拆解
1. 定义 State
from typing import TypedDict
from langchain_core.runnables import RunnableConfig
from langgraph.constants import START, END
from langgraph.graph import StateGraph
class State(TypedDict):
score: int2. 定义节点
def node_high(state: State, config: RunnableConfig):
# 高分路径:加 100 分
return {"score": state["score"] + 100}
def node_low(state: State, config: RunnableConfig):
# 低分路径:加 5 分
return {"score": state["score"] + 5}3. 路由函数返回布尔值
def routing_func(state: State) -> bool:
if state["score"] > 50:
return True # True → 走高分路径
else:
return False # False → 走低分路径关键点:
- 路由函数的返回值类型是
bool,而不是字符串 - 返回
True或False代表两种状态
4. 用 mapping 映射
builder.add_conditional_edges(
START,
routing_func,
{
True: "node_high",
False: "node_low"
}
)优势:
- 路由函数可以复用(只要返回 True/False 的逻辑相同)
- 不同 Graph 可以用同一个路由函数,映射到不同节点
运行示例
# 输入 score=30,30 <= 50,返回 False → node_low
print(graph.invoke({"score": 30}))
# 输出: {'score': 35}
# 输入 score=80,80 > 50,返回 True → node_high
print(graph.invoke({"score": 80}))
# 输出: {'score': 180}案例一完整代码
from typing import TypedDict
from langchain_core.runnables import RunnableConfig
from langgraph.constants import START, END
from langgraph.graph import StateGraph
class State(TypedDict):
score: int
def node_high(state: State, config: RunnableConfig):
return {"score": state["score"] + 100}
def node_low(state: State, config: RunnableConfig):
return {"score": state["score"] + 5}
def routing_func(state: State) -> bool:
if state["score"] > 50:
return True
else:
return False
builder = StateGraph(State)
builder.add_node("node_high", node_high)
builder.add_node("node_low", node_low)
builder.add_conditional_edges(
START,
routing_func,
{
True: "node_high",
False: "node_low"
}
)
builder.add_edge("node_high", END)
builder.add_edge("node_low", END)
graph = builder.compile()
# 测试
print(graph.invoke({"score": 30})) # {'score': 35}
print(graph.invoke({"score": 80})) # {'score': 180}
# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))案例二:Send 动态分发
案例概览
有时候,我们需要根据输入数据动态决定执行多少次。比如:
- 收到一批文件,每个文件需要单独处理
- 收到多个问题,每个问题需要单独回答
- 收到多个任务,需要并行处理
LangGraph 的 Send 机制就是为此设计的:路由函数可以返回多个 Send 指令,让同一个节点被并行触发多次。
核心概念:Send
from langgraph.types import Send
def routing_func(state: State):
result = []
for item in state["items"]:
# 每个 item 生成一个 Send 指令
result.append(Send("processor", {"item": item}))
return resultSend 的作用:
Send(node_name, private_state)表示"向 node_name 发送一个带私有状态的执行请求"- 返回多个 Send,LangGraph 会并行执行这些请求
- 每个 Send 携带的是私有状态(PrivateState),不是全局 State
知识点拆解
1. 定义全局 State 和私有 State
from typing import TypedDict, Annotated, List
from operator import add
class State(TypedDict):
items: Annotated[List[str], add] # 全局状态,自动累加
class PrivateState(TypedDict):
item: str # 私有状态,只传给单个节点执行两种 State 的区别:
| 类型 | 作用 | 生命周期 |
|---|---|---|
| State(全局) | 所有节点共享,会被持久化 | 整个 Graph 执行期间 |
| PrivateState(私有) | 只传给单次节点执行 | 单次节点执行 |
2. 定义处理节点
def processor(state: PrivateState) -> State:
res = state["item"] + " ✓"
return {"items": [res]} # 会 append 到全局 items注意:
- 节点接收的是
PrivateState(私有状态) - 节点返回的是
State(全局状态更新)
3. 路由函数返回 Send 列表
def routing_func(state: State):
result = []
for item in state["items"]:
result.append(Send("processor", {"item": item}))
return result执行过程:
- 假设输入
items = ["苹果", "香蕉", "橙子", "葡萄"] - routing_func 生成 4 个 Send 指令
- LangGraph 并行执行 4 次 processor
- 每次执行返回一个处理后的结果
- 结果被 append 到全局 items
运行示例
result = graph.invoke({"items": ["苹果", "香蕉", "橙子", "葡萄"]})
print(result)
# {'items': ['苹果', '香蕉', '橙子', '葡萄', '苹果 ✓', '香蕉 ✓', '橙子 ✓', '葡萄 ✓']}案例二完整代码
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph
from langgraph.types import Send
from langgraph.constants import START, END
from operator import add
class State(TypedDict):
items: Annotated[List[str], add]
class PrivateState(TypedDict):
item: str
def processor(state: PrivateState) -> State:
res = state["item"] + " ✓"
return {"items": [res]}
def routing_func(state: State):
result = []
for item in state["items"]:
result.append(Send("processor", {"item": item}))
return result
builder = StateGraph(State)
builder.add_node("processor", processor)
builder.add_conditional_edges(START, routing_func, ["processor"])
builder.add_edge("processor", END)
graph = builder.compile()
# 测试
result = graph.invoke({"items": ["苹果", "香蕉", "橙子", "葡萄"]})
print(result)
# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))Send 进阶:多目标分发
Send 不仅可以向同一个节点发送多次,还可以向不同节点分发:
def smart_router(state: State):
sends = []
for task in state["tasks"]:
if task["type"] == "text":
sends.append(Send("text_processor", {"task": task}))
elif task["type"] == "image":
sends.append(Send("image_processor", {"task": task}))
else:
sends.append(Send("default_processor", {"task": task}))
return sends应用场景:
- 根据任务类型分发到不同的处理器
- 实现 Map-Reduce 模式:先并行处理,再汇总结果
- 构建多 Agent 协作系统
Send vs 条件边
| 特性 | Send | 条件边 |
|---|---|---|
| 触发数量 | 可以触发多次 | 只能走一条路 |
| 并行能力 | 天然支持并行 | 串行执行 |
| 状态传递 | 可传私有状态 | 共享全局状态 |
| 典型场景 | 批量处理、fan-out | 分支选择 |
案例三:Command API
案例概览
在前面的例子中,节点通过返回 State 更新来控制数据流,而流程走向由 Edge 决定。
但有时候,我们希望节点自己决定下一步去哪。LangGraph 1.x 引入的 Command API 就提供了这种能力。
核心概念:Command
from langgraph.types import Command
def node(state: State):
return Command(
goto=END, # 显式指定下一步去哪
update={"tasks": [...]} # 更新 State
)Command 的两个核心参数:
goto:显式指定下一个节点(覆盖边的自动路由)update:要更新的状态字段
知识点拆解
1. 普通返回 vs Command 返回
普通返回(之前的方式):
def node(state: State):
return {"tasks": ["完成"]} # 只更新状态,走默认边Command 返回(命令式控制):
def node(state: State):
return Command(
goto=END, # 覆盖默认边,直接去 END
update={"tasks": ["完成"]} # 同时更新状态
)2. Command 的优势
| 特性 | 普通返回 | Command 返回 |
|---|---|---|
| 状态更新 | ✅ 支持 | ✅ 支持 |
| 控制流向 | ❌ 只能走预定义的边 | ✅ 可以动态决定 |
| 条件跳转 | 需要条件边 | 直接在节点内实现 |
3. 使用场景
- 早期终止:处理完成后直接跳到 END
- 错误处理:出错时跳到错误处理节点
- 循环控制:满足条件时跳回某个节点
运行示例
result = graph.invoke(
{"tasks": ["学习Python", "写代码", "测试程序", "部署上线"]}
)
print(result)
# {'tasks': ['学习Python', '写代码', '测试程序', '部署上线',
# '学习Python [完成]', '写代码 [完成]', '测试程序 [完成]', '部署上线 [完成]']}案例三完整代码
from operator import add
from typing import TypedDict, Annotated
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import Command
class State(TypedDict):
tasks: Annotated[list[str], add]
def task_processor(state: State):
completed_tasks = []
for task in state["tasks"]:
completed_tasks.append(task + " [完成]")
return Command(
goto=END,
update={"tasks": completed_tasks}
)
builder = StateGraph(State)
builder.add_node("task_processor", task_processor)
builder.add_edge(START, "task_processor")
graph = builder.compile()
# 测试
result = graph.invoke(
{"tasks": ["学习Python", "写代码", "测试程序", "部署上线"]}
)
print(result)
# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))案例四:Command 循环控制
案例概览
Command 不仅可以跳到 END,还可以跳到任意节点,甚至跳回自己实现循环。
这在需要重试、迭代处理的场景中非常有用。
核心概念:动态 goto
def step_two(state: State):
current_step = state["step"]
if current_step < 3:
next_node = "step_two" # 循环回自己
else:
next_node = END # 结束
return Command(
goto=next_node,
update={"step": current_step + 1}
)执行流程图解
初始状态: step=0
step_one:
- 设置 step=1
- goto="step_two"
↓
step_two (第1次):
- step=1 < 3,继续循环
- 设置 step=2
- goto="step_two"
↓
step_two (第2次):
- step=2 < 3,继续循环
- 设置 step=3
- goto="step_two"
↓
step_two (第3次):
- step=3 >= 3,结束
- goto=END
↓
结束案例四完整代码
from operator import add
from typing import TypedDict, Annotated
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import Command
class State(TypedDict):
logs: Annotated[list[str], add]
step: int
def step_one(state: State):
return Command(
goto="step_two",
update={
"logs": ["步骤一:初始化完成"],
"step": 1
}
)
def step_two(state: State):
current_step = state["step"]
if current_step < 3:
next_node = "step_two"
else:
next_node = END
return Command(
goto=next_node,
update={
"logs": [f"步骤二:处理中... (第{current_step + 1}次)"],
"step": current_step + 1
}
)
builder = StateGraph(State)
builder.add_node("step_one", step_one)
builder.add_node("step_two", step_two)
builder.add_edge(START, "step_one")
graph = builder.compile()
# 测试
result = graph.invoke({"logs": ["流程启动"], "step": 0})
print("执行日志:")
for log in result["logs"]:
print(f" - {log}")
# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
# 输出:
# - 流程启动
# - 步骤一:初始化完成
# - 步骤二:处理中... (第2次)
# - 步骤二:处理中... (第3次)
# - 步骤二:处理中... (第4次)案例五:Subgraph 子图嵌套
案例概览
当 Graph 变得复杂时,我们可以将其拆分成多个子图,然后在父图中调用。
这就像编程中的"函数封装":
- 子图:封装一个独立的处理流程
- 父图:组合多个子图,形成完整流程
核心概念:Subgraph 作为节点
# 1. 构建子图
subgraph_builder = StateGraph(State)
subgraph_builder.add_node("sub_processor", sub_processor)
subgraph_builder.add_edge(START, "sub_processor")
subgraph_builder.add_edge("sub_processor", END)
subgraph = subgraph_builder.compile()
# 2. 在父图中使用子图作为节点
builder = StateGraph(State)
builder.add_node("subgraph_node", subgraph) # 子图作为节点
builder.add_edge(START, "subgraph_node")
builder.add_edge("subgraph_node", END)
graph = builder.compile()知识点拆解
1. 子图的独立性
子图是一个完整的 Graph:
- 有自己的 START 和 END
- 可以独立 compile 和 invoke
- 可以单独测试
2. 父图调用子图
当父图执行到 subgraph_node 时:
- 将当前 State 传给子图
- 子图执行完整流程(START → ... → END)
- 子图的输出状态合并回父图
3. State 共享
父图和子图通常使用相同的 State 类型:
- 子图可以读取父图传入的状态
- 子图的更新会反映到父图
运行示例
result = graph.invoke({"logs": ["[父图] 开始执行"]})
print("执行日志:")
for log in result["logs"]:
print(f" - {log}")
# 输出:
# - [父图] 开始执行
# - [父图] 开始执行(子图收到的初始状态)
# - [子图] 数据处理完成案例五完整代码
from operator import add
from typing import TypedDict, Annotated
from langgraph.constants import END, START
from langgraph.graph import StateGraph
class State(TypedDict):
logs: Annotated[list[str], add]
def sub_processor(state: State) -> dict:
return {"logs": ["[子图] 数据处理完成"]}
# 构建子图
subgraph_builder = StateGraph(State)
subgraph_builder.add_node("sub_processor", sub_processor)
subgraph_builder.add_edge(START, "sub_processor")
subgraph_builder.add_edge("sub_processor", END)
subgraph = subgraph_builder.compile()
# 构建父图
builder = StateGraph(State)
builder.add_node("subgraph_node", subgraph)
builder.add_edge(START, "subgraph_node")
builder.add_edge("subgraph_node", END)
graph = builder.compile()
# 测试
result = graph.invoke({"logs": ["[父图] 开始执行"]})
print("执行日志:")
for log in result["logs"]:
print(f" - {log}")
# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))深入理解:四种边的本质区别
LangGraph 提供了四种控制流程的方式,它们各有特点:
1. 普通边(Normal Edge)
builder.add_edge("node_a", "node_b")特点:无条件跳转,适合固定流程。
2. 条件边(Conditional Edge)
builder.add_conditional_edges(
"node_a",
routing_func,
{"case1": "node_b", "case2": "node_c"}
)特点:根据状态动态选择一条路径。
3. Send 边
def router(state):
return [Send("worker", {"task": t}) for t in state["tasks"]]特点:可以触发多个并行执行,每个执行携带独立的私有状态。
4. Command
def node(state):
return Command(goto="next_node", update={"key": "value"})特点:在节点内部决定流向,覆盖预定义的边。
选择指南
需要做决策吗?
├── 否 → 普通边
└── 是 → 决策点在哪?
├── 边上(路由函数)
│ ├── 选一条路 → 条件边
│ └── 多条并行 → Send
└── 节点内部 → Command组合使用示例
实际项目中,这些机制经常组合使用:
def smart_processor(state: State):
# 根据状态决定行为
if state["error_count"] > 3:
# 使用 Command 直接跳转到错误处理
return Command(goto="error_handler", update={"status": "failed"})
if state["needs_review"]:
# 正常返回状态,让条件边决定下一步
return {"processed": True}
# 使用 Command 跳过审核直接完成
return Command(goto=END, update={"processed": True})本章总结
本节介绍了 LangGraph 的高级路由和控制流机制:
| 机制 | 核心特点 | 适用场景 |
|---|---|---|
| Boolean Routing | 路由函数返回布尔/枚举,用 mapping 映射 | 可复用的路由逻辑 |
| Send | 动态生成多个执行指令,并行处理 | 批量处理、Map-Reduce |
| Command | 节点内显式指定下一步 | 动态流程控制、循环 |
| Subgraph | 子图作为节点,模块化设计 | 复杂流程拆分 |
选择指南
- 简单条件分支 → 用普通条件边
- 可复用的分支逻辑 → 用 Boolean Routing
- 并行处理多个任务 → 用 Send
- 节点内动态决定流向 → 用 Command
- 复杂流程模块化 → 用 Subgraph
思考题
- 如果 Send 返回的是空列表
[],会发生什么? - Command 的
goto可以跳到还没执行过的节点吗? - 子图和父图可以使用不同的 State 类型吗?如果可以,怎么做?
- 如何用 Command 实现一个最多重试 3 次的错误处理机制?
下一步
掌握了这些高级特性后,你已经可以构建相当复杂的 AI Agent 了。接下来的章节会介绍:
- Human-in-the-loop:人机协作
- Persistence:状态持久化
- Streaming:流式输出
这些特性将让你的 Agent 更加实用和强大!