LangGraph Dynamic Breakpoints 详细解读
📚 概述
本文档详细解读 LangGraph 中的 Dynamic Breakpoints(动态断点) 功能。这是一种高级的人机交互(Human-in-the-Loop)技术,允许图在运行时根据特定条件自动中断,而不是由开发者预先在节点上设置固定的断点。
📚 术语表
| 术语名称 | LangGraph 定义和解读 | Python 定义和说明 | 重要程度 |
|---|---|---|---|
| Dynamic Breakpoint | 运行时条件性中断机制,节点内部根据业务逻辑动态决定是否暂停,而非编译时固定设置 | 通过抛出 NodeInterrupt 异常实现 | ⭐⭐⭐⭐⭐ |
| NodeInterrupt | 动态断点的核心类,在节点内部抛出此异常可触发中断,并传递中断原因信息 | from langgraph.errors import NodeInterrupt | ⭐⭐⭐⭐⭐ |
| raise NodeInterrupt | 触发动态中断的语法,传入字符串参数作为中断原因,立即停止节点执行并暂停图 | raise NodeInterrupt("中断原因说明") | ⭐⭐⭐⭐⭐ |
| state.tasks | 状态对象的任务列表属性,包含待执行任务和中断信息(interrupts)的详细数据 | state.tasks[0].interrupts 访问中断详情 | ⭐⭐⭐⭐ |
| Interrupt.value | 中断对象的值属性,存储通过 NodeInterrupt 传递的中断原因字符串 | state.tasks[0].interrupts[0].value | ⭐⭐⭐⭐ |
| Interrupt.when | 中断发生时机,值为 'during'(节点执行期间/动态断点)或 'before'(节点执行前/静态断点) | 字符串枚举值: "during" 或 "before" | ⭐⭐⭐ |
| update_state() | 更新图状态的方法,用于修改中断时的状态,是从中断恢复的必要步骤 | graph.update_state(config, new_state) | ⭐⭐⭐⭐⭐ |
| 条件性中断 | 基于业务规则的智能中断,只在满足特定条件时触发,如输入过长、风险评分高等 | if 条件判断 + raise NodeInterrupt | ⭐⭐⭐⭐⭐ |
| 信息传递机制 | 动态断点可携带结构化信息(如 JSON),向用户传递详细的中断原因和建议 | 通过 NodeInterrupt 参数传递字符串或 JSON | ⭐⭐⭐⭐ |
| 状态恢复循环 | 中断后若不修改状态直接恢复,会重新触发相同中断,形成循环,必须修改状态才能通过 | Python 异常处理和状态管理机制 | ⭐⭐⭐⭐ |
| PregelTask | 任务对象,包含节点名称、错误信息、中断列表等,用于追踪图执行状态 | LangGraph 内部数据结构 | ⭐⭐⭐ |
🎯 核心概念
什么是 Dynamic Breakpoints?
Dynamic Breakpoints(动态断点)是 LangGraph 提供的一种内部中断机制,与传统的静态断点不同:
| 特性 | 静态断点 | 动态断点 |
|---|---|---|
| 设置时机 | 图编译时预先设置 | 运行时根据条件触发 |
| 触发方式 | 固定在特定节点 | 节点内部逻辑决定 |
| 灵活性 | 每次执行都会中断 | 可以条件性中断 |
| 信息传递 | 无法传递中断原因 | 可以传递详细信息 |
为什么需要 Dynamic Breakpoints?
回顾人机交互的三大应用场景:
- Approval(审批) - 中断后等待用户批准某个操作
- Debugging(调试) - 回溯图的执行过程,重现或避免问题
- Editing(编辑) - 修改状态,注入人类反馈
Dynamic Breakpoints 特别适合以下场景:
✅ 条件性中断:只在满足特定条件时才中断(如输入过长、检测到敏感内容、风险评分过高)
✅ 信息传递:告诉用户为什么会中断(如"输入超过字数限制"、"检测到不安全内容")
✅ 智能控制:根据业务逻辑动态决定是否需要人工介入
🎭 实战案例:条件长度检查系统
我们将构建一个简单的三步处理流程,当输入长度超过 5 个字符时自动中断,要求人工审核。
系统需求
流程:
- Step 1:接收并处理输入
- Step 2:检查输入长度,如果超过 5 个字符则中断
- Step 3:继续处理(只有通过 Step 2 后才执行)
系统架构图
用户输入
↓
[step_1] 处理输入
↓
[step_2] 检查长度
↓ (len > 5 ?)
├─ Yes → 抛出 NodeInterrupt (中断并传递原因)
├─ No → 继续
↓
[step_3] 最终处理
↓
END🔧 代码实现详解
1. 导入依赖
from IPython.display import Image, display
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt # ⭐ 核心:动态中断
from langgraph.graph import START, END, StateGraph关键导入:
NodeInterrupt:这是实现动态断点的核心类MemorySaver:用于保存状态,支持中断后恢复
2. 定义状态
class State(TypedDict):
input: str # 简单状态:只包含输入字符串说明:
- 这是一个极简的状态定义,只包含用户输入
- 实际应用中可以包含更多字段(如处理结果、元数据等)
3. 实现三个处理节点
Step 1:初始处理
def step_1(state: State) -> State:
print("---Step 1---")
return state # 直接返回状态,不做修改功能: 这是第一个处理步骤,这里只是简单输出日志。
Step 2:条件检查与动态中断 ⭐
def step_2(state: State) -> State:
# 检查输入长度
if len(state['input']) > 5:
# 抛出 NodeInterrupt,传递中断原因
raise NodeInterrupt(f"Received input that is longer than 5 characters: {state['input']}")
print("---Step 2---")
return state核心机制详解:
raise NodeInterrupt(message)
# ^^^^^^^^^^^^^ ^^^^^^^
# 动态中断类 中断信息(传递给用户)关键点:
- 条件判断:
len(state['input']) > 5- 只在满足条件时中断 - 抛出异常:使用
raise NodeInterrupt触发中断 - 传递信息:字符串参数会被记录到状态中,用户可以看到中断原因
- 执行停止:一旦抛出,节点的后续代码不会执行,图会立即暂停
与 Python 异常处理的关系:
NodeInterrupt 是一个特殊的异常类,但它不是用来处理错误的,而是用来有意地暂停执行:
# 普通异常 - 表示错误
try:
result = risky_operation()
except Exception as e:
handle_error(e)
# NodeInterrupt - 表示需要人工介入
if needs_human_review(data):
raise NodeInterrupt("需要人工审核") # 不需要 try-exceptStep 3:最终处理
def step_3(state: State) -> State:
print("---Step 3---")
return state功能: 只有通过 Step 2 的检查后,才会执行这一步。
4. 构建图
builder = StateGraph(State)
# 添加三个节点
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
# 添加边:定义执行顺序
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)
# 设置内存检查点(必需!)
memory = MemorySaver()
# 编译图
graph = builder.compile(checkpointer=memory)重要提醒:
checkpointer=memory是必需的!- 没有 checkpointer,图无法在中断后恢复
- Memory Saver 会在每个节点后保存状态快照
5. 执行图:触发动态中断
initial_input = {"input": "hello world"} # 11 个字符,超过 5
thread_config = {"configurable": {"thread_id": "1"}}
# 运行图
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
print(event)输出:
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}分析:
- 图开始执行,状态为
{"input": "hello world"} step_1执行并输出日志step_2开始执行,检测到长度超过 5,抛出NodeInterrupt- 图立即暂停,不会执行
step_3
6. 检查中断状态
查看下一个待执行节点
state = graph.get_state(thread_config)
print(state.next)输出:
('step_2',)说明:
state.next显示下一个要执行的节点是step_2- 因为
step_2中断了,它会重新执行(除非状态被修改)
查看中断信息
print(state.tasks)输出:
(PregelTask(
id='6eb3910d-e231-5ba2-b25e-28ad575690bd',
name='step_2',
error=None,
interrupts=(
Interrupt(
value='Received input that is longer than 5 characters: hello world',
when='during'
),
),
state=None
),)关键信息解读:
| 字段 | 值 | 说明 |
|---|---|---|
name | 'step_2' | 中断发生在哪个节点 |
error | None | 这不是错误,所以为 None |
interrupts | Interrupt(...) | 中断详情 |
value | 'Received input...' | 我们传递的中断原因 |
when | 'during' | 中断发生在节点执行期间 |
when 字段说明:
'during':中断发生在节点内部(动态断点)'before':中断发生在节点之前(静态断点)
7. 尝试恢复执行(失败案例)
# 尝试继续执行
for event in graph.stream(None, thread_config, stream_mode="values"):
print(event)输出:
{'input': 'hello world'}再次检查状态:
state = graph.get_state(thread_config)
print(state.next)输出:
('step_2',)问题分析:
- 调用
graph.stream(None, ...)会重新执行step_2 - 但状态没有改变,
input仍然是"hello world" - 所以再次触发
NodeInterrupt,陷入循环!
教训: 必须修改状态才能通过中断点。
8. 修改状态并恢复执行(成功案例)
更新状态
graph.update_state(
thread_config,
{"input": "hi"}, # 修改输入为短字符串(2 个字符)
)输出:
{
'configurable': {
'thread_id': '1',
'checkpoint_ns': '',
'checkpoint_id': '1ef6a434-06cf-6f1e-8002-0ea6dc69e075'
}
}说明:
update_state会修改当前状态- 返回的是新的 checkpoint ID
- 下次执行时会使用新状态
继续执行
for event in graph.stream(None, thread_config, stream_mode="values"):
print(event)输出:
{'input': 'hi'}
---Step 2---
{'input': 'hi'}
---Step 3---
{'input': 'hi'}成功!分析:
- 从
step_2重新开始执行 - 此时
state['input']是"hi"(2 个字符) - 长度检查通过,不会抛出
NodeInterrupt - 继续执行
step_3,完成整个流程
🎓 核心知识点总结
LangGraph 特有概念
1. NodeInterrupt 类
语法:
from langgraph.errors import NodeInterrupt
raise NodeInterrupt(message: str)特点:
- 可以在节点内部的任何位置抛出
- 必须传递一个字符串参数(中断原因)
- 会立即停止节点执行
- 中断信息会保存到
state.tasks[].interrupts
与静态断点的对比:
# 静态断点 - 在编译时设置
graph = builder.compile(
checkpointer=memory,
interrupt_before=["step_2"] # 总是在 step_2 之前中断
)
# 动态断点 - 在运行时触发
def step_2(state):
if some_condition(state):
raise NodeInterrupt("条件满足,需要人工审核") # 条件性中断
return state2. 中断后的状态管理
state.next
显示下一个待执行的节点:
state = graph.get_state(thread_config)
print(state.next) # ('step_2',)state.tasks
包含所有待执行任务的详细信息:
state.tasks[0].name # 节点名称
state.tasks[0].interrupts # 中断信息列表
state.tasks[0].error # 错误信息(如果有)3. 状态恢复流程
1. 中断发生
↓
2. 调用 get_state() 检查状态
↓
3. 调用 update_state() 修改状态
↓
4. 调用 stream(None, ...) 继续执行
↓
5. 从中断点重新开始关键细节:
stream(None, ...)中的None表示不提供新输入,使用当前状态- 如果状态未修改,会重新执行相同的逻辑(可能再次中断)
Python 特有知识点
1. TypedDict 基础
from typing_extensions import TypedDict
class State(TypedDict):
input: str作用:
- 定义字典的结构和类型
- 提供 IDE 自动补全和类型检查
- LangGraph 使用它来验证状态
使用示例:
# 创建状态
state: State = {"input": "hello"}
# 类型检查会捕获错误
state = {"input": 123} # ❌ 错误:应该是 str
state = {"output": "hi"} # ❌ 错误:缺少 'input' 键2. Python 异常机制
虽然 NodeInterrupt 是一个异常,但在 LangGraph 中不需要手动捕获:
# ✅ 正确用法
def step_2(state):
if condition:
raise NodeInterrupt("reason") # LangGraph 会自动处理
return state
# ❌ 不需要这样做
def step_2(state):
try:
if condition:
raise NodeInterrupt("reason")
except NodeInterrupt as e:
# 不需要捕获!LangGraph 会自动处理
pass原理: LangGraph 的执行引擎会捕获 NodeInterrupt,保存状态,然后优雅地暂停执行。
3. 字符串格式化(f-string)
raise NodeInterrupt(f"Received input that is longer than 5 characters: {state['input']}")f-string 语法:
- 以
f开头的字符串 - 使用
{}插入变量或表达式 - 在运行时自动计算和格式化
等价写法:
# f-string (推荐)
message = f"Input: {state['input']}, Length: {len(state['input'])}"
# .format() (传统)
message = "Input: {}, Length: {}".format(state['input'], len(state['input']))
# % 格式化 (老式)
message = "Input: %s, Length: %d" % (state['input'], len(state['input']))💡 最佳实践
1. 何时使用动态断点?
✅ 适用场景:
- 内容审核:检测敏感词、违规内容时中断
- 风险评估:风险评分超过阈值时需要人工审核
- 资源限制:输入过大、请求过多时中断
- 用户确认:需要根据 AI 的判断决定是否需要用户确认
- 调试模式:在开发时根据日志级别动态中断
❌ 不适用场景:
- 总是需要中断的节点(用静态断点更简单)
- 节点之间的流程控制(用条件边)
- 错误处理(用 try-except)
2. 动态断点的信息传递技巧
技巧 1:结构化信息
import json
def check_content(state):
risk_score = calculate_risk(state['input'])
if risk_score > 0.8:
# 传递 JSON 格式的详细信息
info = {
"reason": "高风险内容",
"risk_score": risk_score,
"detected_issues": ["暴力", "敏感词"],
"suggestion": "建议人工审核或拒绝"
}
raise NodeInterrupt(json.dumps(info, ensure_ascii=False))
return state用户端解析:
state = graph.get_state(thread_config)
interrupt_value = state.tasks[0].interrupts[0].value
info = json.loads(interrupt_value)
print(f"风险评分: {info['risk_score']}")
print(f"建议: {info['suggestion']}")技巧 2:多条件检查
def validate_input(state):
errors = []
# 检查 1:长度
if len(state['input']) > 1000:
errors.append("输入超过 1000 字符")
# 检查 2:格式
if not is_valid_format(state['input']):
errors.append("格式不符合要求")
# 检查 3:内容
if contains_profanity(state['input']):
errors.append("包含不当内容")
if errors:
raise NodeInterrupt(f"验证失败: {', '.join(errors)}")
return state技巧 3:提供修复建议
def check_data(state):
data = state['data']
if missing_fields := [f for f in required_fields if f not in data]:
suggestion = f"请补充以下字段: {', '.join(missing_fields)}"
raise NodeInterrupt(f"数据不完整。{suggestion}")
return state3. 状态修改策略
策略 1:最小修改原则
只修改必要的字段,保留其他信息:
# ✅ 好的做法 - 只修改需要的字段
graph.update_state(thread_config, {"input": "hi"})
# ❌ 避免 - 覆盖整个状态(可能丢失其他信息)
graph.update_state(thread_config, {"input": "hi", "other_field": None})策略 2:保留历史记录
class State(TypedDict):
input: str
original_input: str # 保留原始输入
modification_history: list # 记录修改历史
def step_2(state):
if len(state['input']) > 5:
raise NodeInterrupt(f"输入过长: {state['input']}")
return state
# 修改时保留历史
state = graph.get_state(thread_config)
original = state.values['input']
graph.update_state(thread_config, {
"input": "hi",
"original_input": original,
"modification_history": [{"from": original, "to": "hi", "reason": "长度超限"}]
})策略 3:条件性修改
# 根据中断原因决定如何修改
state = graph.get_state(thread_config)
interrupt_msg = state.tasks[0].interrupts[0].value
if "长度" in interrupt_msg:
# 截断输入
graph.update_state(thread_config, {"input": original[:5]})
elif "格式" in interrupt_msg:
# 修正格式
graph.update_state(thread_config, {"input": fix_format(original)})4. 与 LangGraph API/Studio 集成
在实际应用中,通常会通过 API 与图交互。以下是完整的工作流程:
连接到 LangGraph 服务
from langgraph_sdk import get_client
# 连接到本地开发服务器
URL = "http://127.0.0.1:2024"
client = get_client(url=URL)
# 搜索可用的图
assistants = await client.assistants.search()创建线程并执行
# 创建新线程
thread = await client.threads.create()
# 执行图
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="dynamic_breakpoints",
input={"input": "hello world"},
stream_mode="values"
):
print(f"Event type: {chunk.event}")
print(chunk.data)输出示例:
Event type: metadata
{'run_id': '1ef6a43a-1b04-64d0-9a79-1caff72c8a89'}
Event type: values
{'input': 'hello world'}
Event type: values
{'input': 'hello world'}检查中断状态
# 获取当前状态
current_state = await client.threads.get_state(thread['thread_id'])
print(current_state['next']) # ['step_2']
print(current_state['tasks'][0]['interrupts']) # 中断详情更新状态并恢复
# 更新状态
await client.threads.update_state(
thread['thread_id'],
{"input": "hi!"}
)
# 继续执行(传入 None)
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="dynamic_breakpoints",
input=None, # ⭐ None 表示使用当前状态
stream_mode="values"
):
print(chunk.data)输出:
{'input': 'hi!'}
---Step 2---
{'input': 'hi!'}
---Step 3---
{'input': 'hi!'}🚀 进阶技巧
1. 多级中断条件
def advanced_check(state):
input_text = state['input']
# 第一级:严重问题 - 立即拒绝
if contains_illegal_content(input_text):
raise NodeInterrupt(
"严重违规:检测到非法内容,已自动拒绝。"
)
# 第二级:中等风险 - 需要审核
risk = calculate_risk(input_text)
if risk > 0.7:
raise NodeInterrupt(
f"中等风险 (评分: {risk}):建议人工审核。"
)
# 第三级:轻微问题 - 警告但继续
if risk > 0.4:
print(f"⚠️ 警告:检测到轻微风险 (评分: {risk})")
return state2. 带超时的中断
from datetime import datetime, timedelta
class State(TypedDict):
input: str
interrupt_timestamp: str
max_wait_seconds: int
def timed_check(state):
if needs_approval(state['input']):
timestamp = datetime.now().isoformat()
raise NodeInterrupt(
f"需要审批。超时时间:{state.get('max_wait_seconds', 3600)} 秒"
)
return state
# 恢复时检查超时
state = graph.get_state(thread_config)
if state.tasks and state.tasks[0].interrupts:
interrupt_time = datetime.fromisoformat(state.values['interrupt_timestamp'])
if datetime.now() - interrupt_time > timedelta(seconds=state.values['max_wait_seconds']):
# 超时处理:自动拒绝或使用默认值
graph.update_state(thread_config, {"input": "TIMEOUT_REJECTED"})3. 中断后的多路径选择
class State(TypedDict):
input: str
approval_decision: str # "approve", "reject", "modify"
def check_content(state):
if needs_review(state['input']):
raise NodeInterrupt("内容需要审核,请选择操作")
return state
def handle_decision(state):
decision = state.get('approval_decision', 'reject')
if decision == 'approve':
return {"status": "approved"}
elif decision == 'modify':
return {"input": state.get('modified_input', state['input'])}
else:
return {"status": "rejected"}
# 用户决策后更新
graph.update_state(thread_config, {
"approval_decision": "modify",
"modified_input": "修改后的内容"
})4. 组合静态与动态断点
# 编译时设置静态断点
graph = builder.compile(
checkpointer=memory,
interrupt_before=["final_step"] # 总是在最后一步前中断
)
# 节点中使用动态断点
def middle_step(state):
if high_risk(state['data']):
raise NodeInterrupt("检测到高风险,需要额外审核") # 动态中断
return state组合效果:
middle_step:只在高风险时中断final_step:总是中断(最终确认)
📊 静态 vs 动态断点对比
| 特性 | 静态断点 | 动态断点 |
|---|---|---|
| 设置方式 | interrupt_before=["node"] | raise NodeInterrupt() |
| 中断时机 | 节点执行前 | 节点执行中 |
| 触发条件 | 总是触发 | 根据逻辑条件 |
| 信息传递 | 无 | 可以传递详细信息 |
| 灵活性 | 低(固定节点) | 高(动态决定) |
| 适用场景 | 固定审批点 | 条件性审核 |
实际应用建议:
# 静态断点 - 用于固定流程
interrupt_before=["deploy", "payment"] # 总是需要确认的步骤
# 动态断点 - 用于智能判断
def content_filter(state):
if ai_detects_issue(state['content']): # AI 判断
raise NodeInterrupt("AI 检测到潜在问题")
return state🎯 实际应用案例
案例 1:内容审核系统
class ContentState(TypedDict):
content: str
category: str
risk_score: float
def content_moderation(state: ContentState):
# 使用 AI 模型评估内容
result = moderation_model.analyze(state['content'])
state['risk_score'] = result.risk_score
# 高风险:立即拒绝
if result.risk_score > 0.9:
raise NodeInterrupt(
f"高风险内容 (评分: {result.risk_score}),已自动拒绝。"
f"检测到的问题: {', '.join(result.issues)}"
)
# 中风险:人工审核
if result.risk_score > 0.5:
raise NodeInterrupt(
f"中风险内容 (评分: {result.risk_score}),需要人工审核。"
f"可疑点: {', '.join(result.warnings)}"
)
# 低风险:自动通过
return state
# 人工审核后的处理
def handle_review(state):
decision = state.get('review_decision')
if decision == 'approve':
return {"status": "approved"}
elif decision == 'reject':
return {"status": "rejected"}
else:
return {"status": "needs_modification"}案例 2:金融交易审批
class TransactionState(TypedDict):
amount: float
account: str
risk_factors: list
approval_required: bool
def transaction_check(state: TransactionState):
amount = state['amount']
# 规则 1:大额交易
if amount > 10000:
raise NodeInterrupt(
f"大额交易需审批:金额 ${amount:,.2f} 超过限额 $10,000"
)
# 规则 2:异常模式
if detect_anomaly(state['account']):
risk_factors = analyze_risk(state)
raise NodeInterrupt(
f"检测到异常交易模式。风险因素: {', '.join(risk_factors)}"
)
# 规则 3:新账户
if is_new_account(state['account']):
raise NodeInterrupt(
f"新账户首次大额交易,需要身份验证"
)
return state案例 3:AI 助手的敏感操作确认
class AssistantState(TypedDict):
user_request: str
planned_action: str
destructive: bool
def plan_execution(state: AssistantState):
action = parse_action(state['user_request'])
state['planned_action'] = action
# 检测破坏性操作
destructive_keywords = ['delete', 'remove', 'drop', 'truncate']
if any(kw in action.lower() for kw in destructive_keywords):
state['destructive'] = True
raise NodeInterrupt(
f"⚠️ 检测到潜在破坏性操作: {action}\n"
f"此操作可能无法撤销,需要您的明确确认。"
)
return state
# 用户确认后继续
graph.update_state(thread_config, {"user_confirmed": True})案例 4:多步骤验证流程
class VerificationState(TypedDict):
user_id: str
verification_steps: list
current_step: int
def verification_flow(state: VerificationState):
steps = ['identity', 'email', 'phone', 'document']
current = state['current_step']
if current < len(steps):
step_name = steps[current]
raise NodeInterrupt(
f"验证步骤 {current + 1}/{len(steps)}: 请完成{step_name}验证"
)
return {"verification_complete": True}
# 用户完成每一步后更新
for step in range(4):
# 用户完成验证
user_completes_verification(step)
# 更新状态
graph.update_state(thread_config, {"current_step": step + 1})
# 继续执行
graph.stream(None, thread_config)完整案例代码(可直接运行)
以下是一个完整的、可以直接在 Jupyter Notebook 中运行的代码示例,演示 Dynamic Breakpoints 的核心功能:
# ============================================================
# LangGraph Dynamic Breakpoints 完整示例
# 演示:条件性动态中断(基于输入长度)
# ============================================================
# --------------------------
# 1. 导入必要的库
# --------------------------
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
from langgraph.graph import START, END, StateGraph
from IPython.display import Image, display
# --------------------------
# 2. 定义状态
# --------------------------
class State(TypedDict):
input: str
# --------------------------
# 3. 定义节点函数
# --------------------------
def step_1(state: State) -> State:
"""第一步:初始处理"""
print("---Step 1: 处理输入---")
return state
def step_2(state: State) -> State:
"""第二步:条件检查,超过阈值时动态中断"""
# 检查输入长度
if len(state['input']) > 5:
# 抛出 NodeInterrupt 触发动态中断
raise NodeInterrupt(
f"输入长度超过限制!当前长度: {len(state['input'])} 字符 (最大: 5)\n"
f"原始输入: '{state['input']}'\n"
f"建议:请缩短输入后重试"
)
print("---Step 2: 长度检查通过---")
return state
def step_3(state: State) -> State:
"""第三步:最终处理"""
print("---Step 3: 处理完成---")
return state
# --------------------------
# 4. 构建图
# --------------------------
builder = StateGraph(State)
# 添加节点
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
# 添加边:定义执行顺序
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)
# 编译图(必须使用 checkpointer!)
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
# --------------------------
# 5. 可视化图结构
# --------------------------
print("📊 图结构可视化:")
display(Image(graph.get_graph().draw_mermaid_png()))
# --------------------------
# 6. 测试场景 1:短输入(不触发中断)
# --------------------------
print("\n" + "=" * 60)
print("📗 场景 1:短输入 'hi'(不触发中断)")
print("=" * 60)
thread1 = {"configurable": {"thread_id": "short-input"}}
for event in graph.stream({"input": "hi"}, thread1, stream_mode="values"):
print(f"状态: {event}")
state1 = graph.get_state(thread1)
print(f"\n执行结果: next={state1.next}, 已完成")
# --------------------------
# 7. 测试场景 2:长输入(触发动态中断)
# --------------------------
print("\n" + "=" * 60)
print("📕 场景 2:长输入 'hello world'(触发动态中断)")
print("=" * 60)
thread2 = {"configurable": {"thread_id": "long-input"}}
for event in graph.stream({"input": "hello world"}, thread2, stream_mode="values"):
print(f"状态: {event}")
# --------------------------
# 8. 检查中断状态
# --------------------------
print("\n" + "-" * 40)
print("🔍 检查中断状态")
print("-" * 40)
state2 = graph.get_state(thread2)
print(f"下一个待执行节点: {state2.next}")
print(f"当前状态值: {state2.values}")
# 查看中断详情
if state2.tasks and state2.tasks[0].interrupts:
interrupt = state2.tasks[0].interrupts[0]
print(f"\n📌 中断信息:")
print(f" 节点: {state2.tasks[0].name}")
print(f" 时机: {interrupt.when}") # 'during' 表示动态断点
print(f" 原因:\n{interrupt.value}")
# --------------------------
# 9. 尝试直接恢复(会失败)
# --------------------------
print("\n" + "-" * 40)
print("❌ 尝试 1:直接恢复(未修改状态)")
print("-" * 40)
for event in graph.stream(None, thread2, stream_mode="values"):
print(f"状态: {event}")
state2_retry = graph.get_state(thread2)
print(f"结果: 仍然在 {state2_retry.next} 处中断(状态未改变,无法通过检查)")
# --------------------------
# 10. 修改状态后恢复(成功)
# --------------------------
print("\n" + "-" * 40)
print("✅ 尝试 2:修改状态后恢复")
print("-" * 40)
# 更新状态为短字符串
graph.update_state(thread2, {"input": "hi"})
print("已将输入修改为 'hi'")
# 继续执行
for event in graph.stream(None, thread2, stream_mode="values"):
print(f"状态: {event}")
state2_final = graph.get_state(thread2)
print(f"\n执行结果: next={state2_final.next}, 已完成!")
# --------------------------
# 11. 高级示例:带结构化信息的中断
# --------------------------
print("\n" + "=" * 60)
print("🔧 高级示例:带结构化信息的动态中断")
print("=" * 60)
import json
class RichState(TypedDict):
content: str
risk_score: float
def content_check(state: RichState) -> RichState:
"""内容检查:模拟风险评估"""
content = state['content']
# 模拟风险评估
risk_score = len(content) / 100 # 简单示例:长度越长风险越高
state['risk_score'] = risk_score
if risk_score > 0.5:
# 传递结构化的中断信息
info = {
"reason": "内容风险评分过高",
"risk_score": round(risk_score, 2),
"threshold": 0.5,
"suggestion": "建议缩短内容或人工审核"
}
raise NodeInterrupt(json.dumps(info, ensure_ascii=False, indent=2))
return state
def process_content(state: RichState) -> RichState:
"""处理内容"""
print(f"✅ 内容处理完成,风险评分: {state['risk_score']:.2f}")
return state
# 构建高级示例图
builder2 = StateGraph(RichState)
builder2.add_node("check", content_check)
builder2.add_node("process", process_content)
builder2.add_edge(START, "check")
builder2.add_edge("check", "process")
builder2.add_edge("process", END)
memory2 = MemorySaver()
graph2 = builder2.compile(checkpointer=memory2)
# 测试高风险内容
thread3 = {"configurable": {"thread_id": "high-risk"}}
print("\n测试高风险内容(60 字符):")
for event in graph2.stream({"content": "x" * 60, "risk_score": 0.0}, thread3, stream_mode="values"):
print(f"状态: content长度={len(event['content'])}")
# 获取结构化中断信息
state3 = graph2.get_state(thread3)
if state3.tasks and state3.tasks[0].interrupts:
info = json.loads(state3.tasks[0].interrupts[0].value)
print(f"\n📊 结构化中断信息:")
print(f" 原因: {info['reason']}")
print(f" 风险评分: {info['risk_score']}")
print(f" 阈值: {info['threshold']}")
print(f" 建议: {info['suggestion']}")
print("\n✨ 演示完成!")运行结果示例:
📊 图结构可视化:
[显示图:START → step_1 → step_2 → step_3 → END]
============================================================
📗 场景 1:短输入 'hi'(不触发中断)
============================================================
状态: {'input': 'hi'}
---Step 1: 处理输入---
状态: {'input': 'hi'}
---Step 2: 长度检查通过---
状态: {'input': 'hi'}
---Step 3: 处理完成---
状态: {'input': 'hi'}
执行结果: next=(), 已完成
============================================================
📕 场景 2:长输入 'hello world'(触发动态中断)
============================================================
状态: {'input': 'hello world'}
---Step 1: 处理输入---
状态: {'input': 'hello world'}
----------------------------------------
🔍 检查中断状态
----------------------------------------
下一个待执行节点: ('step_2',)
当前状态值: {'input': 'hello world'}
📌 中断信息:
节点: step_2
时机: during
原因:
输入长度超过限制!当前长度: 11 字符 (最大: 5)
原始输入: 'hello world'
建议:请缩短输入后重试代码要点说明:
| 要点 | 说明 |
|---|---|
raise NodeInterrupt(message) | 在节点内部抛出,触发动态中断 |
interrupt.when == 'during' | 表示中断发生在节点执行期间(动态断点特征) |
graph.update_state(config, new_values) | 修改状态,解决中断原因 |
checkpointer=memory | 必需!用于保存中断状态和支持恢复 |
| 传递 JSON 字符串 | 可以传递结构化信息,便于解析和展示 |
🔍 常见问题
Q1: NodeInterrupt 和普通 Python 异常有什么区别?
普通异常: 表示错误,需要捕获和处理
try:
result = risky_operation()
except ValueError as e:
handle_error(e) # 错误处理逻辑NodeInterrupt: 表示需要暂停,由 LangGraph 自动处理
if needs_human_review(data):
raise NodeInterrupt("需要人工审核") # 不需要 try-except关键区别:
- 普通异常会中断程序执行(除非捕获)
- NodeInterrupt 会暂停图执行,但可以恢复
- 普通异常表示"出错了",NodeInterrupt 表示"需要等待"
Q2: 为什么必须使用 MemorySaver?
因为动态断点需要:
- 保存中断时的状态
- 记录中断原因
- 支持状态恢复
没有 checkpointer,图无法保存这些信息:
# ❌ 错误 - 无法使用动态断点
graph = builder.compile() # 没有 checkpointer
# ✅ 正确
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)生成的流程图:

Q3: 能否在同一个节点中多次抛出 NodeInterrupt?
不能。一旦抛出 NodeInterrupt,节点执行立即停止:
def check(state):
if condition_1:
raise NodeInterrupt("条件 1 触发")
# ❌ 如果条件 1 为真,这里永远不会执行
if condition_2:
raise NodeInterrupt("条件 2 触发")解决方案: 合并条件或使用多个节点
def check(state):
errors = []
if condition_1:
errors.append("条件 1 失败")
if condition_2:
errors.append("条件 2 失败")
if errors:
raise NodeInterrupt(f"检测到问题: {', '.join(errors)}")Q4: 如何在中断后完全取消执行,而不是继续?
有两种方法:
方法 1:更新状态为终止标志
class State(TypedDict):
input: str
cancelled: bool
def check(state):
if should_cancel(state):
raise NodeInterrupt("操作需要取消")
return state
def next_step(state):
if state.get('cancelled'):
return END # 直接结束
# 正常逻辑
return state
# 用户决定取消
graph.update_state(thread_config, {"cancelled": True})方法 2:不调用 stream 继续执行
# 检查中断原因
state = graph.get_state(thread_config)
if "cancel" in state.tasks[0].interrupts[0].value.lower():
# 不调用 stream,执行就此结束
print("操作已取消")
else:
# 否则修改状态并继续
graph.update_state(thread_config, new_state)
graph.stream(None, thread_config)Q5: 能否在条件边中使用动态断点?
不能直接在条件函数中抛出 NodeInterrupt:
# ❌ 这不会工作
def route(state):
if needs_approval(state):
raise NodeInterrupt("需要审批") # 条件函数中无效
return "next_node"
builder.add_conditional_edges("start", route)正确做法: 在节点中抛出,然后用条件边路由
def approval_node(state):
if needs_approval(state):
raise NodeInterrupt("需要审批")
return state
def route(state):
if state.get('approved'):
return "continue"
else:
return "reject"
builder.add_node("approval", approval_node)
builder.add_conditional_edges("approval", route, ["continue", "reject"])📖 扩展阅读
🎉 总结
Dynamic Breakpoints 是 LangGraph 中强大的人机交互机制,允许图根据业务逻辑智能地决定何时需要人工介入。关键要点:
- 使用
NodeInterrupt在节点内部触发动态中断 - 传递详细信息 告诉用户为什么会中断
- 检查状态 使用
get_state()了解中断原因 - 修改状态 使用
update_state()解决问题 - 继续执行 调用
stream(None, ...)从中断点恢复
通过结合静态断点和动态断点,你可以构建灵活、智能的 AI 应用,在自动化和人工控制之间找到完美的平衡!