LangGraph Dynamic Breakpoints 详细解读
📚 概述
本文档详细解读 LangGraph 中的 Dynamic Breakpoints(动态断点) 功能。这是一种高级的人机交互(Human-in-the-Loop)技术,允许图在运行时根据特定条件自动中断,而不是由开发者预先在节点上设置固定的断点。
🎯 核心概念
什么是 Dynamic Breakpoints?
Dynamic Breakpoints(动态断点)是 LangGraph 提供的一种内部中断机制,与传统的静态断点不同:
特性 | 静态断点 | 动态断点 |
---|---|---|
设置时机 | 图编译时预先设置 | 运行时根据条件触发 |
触发方式 | 固定在特定节点 | 节点内部逻辑决定 |
灵活性 | 每次执行都会中断 | 可以条件性中断 |
信息传递 | 无法传递中断原因 | 可以传递详细信息 |
为什么需要 Dynamic Breakpoints?
回顾人机交互的三大应用场景:
- Approval(审批) - 中断后等待用户批准某个操作
- Debugging(调试) - 回溯图的执行过程,重现或避免问题
- Editing(编辑) - 修改状态,注入人类反馈
Dynamic Breakpoints 特别适合以下场景:
✅ 条件性中断:只在满足特定条件时才中断(如输入过长、检测到敏感内容、风险评分过高)
✅ 信息传递:告诉用户为什么会中断(如"输入超过字数限制"、"检测到不安全内容")
✅ 智能控制:根据业务逻辑动态决定是否需要人工介入
🎭 实战案例:条件长度检查系统
我们将构建一个简单的三步处理流程,当输入长度超过 5 个字符时自动中断,要求人工审核。
系统需求
流程:
- Step 1:接收并处理输入
- Step 2:检查输入长度,如果超过 5 个字符则中断
- Step 3:继续处理(只有通过 Step 2 后才执行)
系统架构图
用户输入
↓
[step_1] 处理输入
↓
[step_2] 检查长度
↓ (len > 5 ?)
├─ Yes → 抛出 NodeInterrupt (中断并传递原因)
├─ No → 继续
↓
[step_3] 最终处理
↓
END
🔧 代码实现详解
1. 导入依赖
from IPython.display import Image, display
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt # ⭐ 核心:动态中断
from langgraph.graph import START, END, StateGraph
关键导入:
NodeInterrupt
:这是实现动态断点的核心类MemorySaver
:用于保存状态,支持中断后恢复
2. 定义状态
class State(TypedDict):
input: str # 简单状态:只包含输入字符串
说明:
- 这是一个极简的状态定义,只包含用户输入
- 实际应用中可以包含更多字段(如处理结果、元数据等)
3. 实现三个处理节点
Step 1:初始处理
def step_1(state: State) -> State:
print("---Step 1---")
return state # 直接返回状态,不做修改
功能: 这是第一个处理步骤,这里只是简单输出日志。
Step 2:条件检查与动态中断 ⭐
def step_2(state: State) -> State:
# 检查输入长度
if len(state['input']) > 5:
# 抛出 NodeInterrupt,传递中断原因
raise NodeInterrupt(f"Received input that is longer than 5 characters: {state['input']}")
print("---Step 2---")
return state
核心机制详解:
raise NodeInterrupt(message)
# ^^^^^^^^^^^^^ ^^^^^^^
# 动态中断类 中断信息(传递给用户)
关键点:
- 条件判断:
len(state['input']) > 5
- 只在满足条件时中断 - 抛出异常:使用
raise NodeInterrupt
触发中断 - 传递信息:字符串参数会被记录到状态中,用户可以看到中断原因
- 执行停止:一旦抛出,节点的后续代码不会执行,图会立即暂停
与 Python 异常处理的关系:
NodeInterrupt
是一个特殊的异常类,但它不是用来处理错误的,而是用来有意地暂停执行:
# 普通异常 - 表示错误
try:
result = risky_operation()
except Exception as e:
handle_error(e)
# NodeInterrupt - 表示需要人工介入
if needs_human_review(data):
raise NodeInterrupt("需要人工审核") # 不需要 try-except
Step 3:最终处理
def step_3(state: State) -> State:
print("---Step 3---")
return state
功能: 只有通过 Step 2 的检查后,才会执行这一步。
4. 构建图
builder = StateGraph(State)
# 添加三个节点
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
# 添加边:定义执行顺序
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)
# 设置内存检查点(必需!)
memory = MemorySaver()
# 编译图
graph = builder.compile(checkpointer=memory)
重要提醒:
checkpointer=memory
是必需的!- 没有 checkpointer,图无法在中断后恢复
- Memory Saver 会在每个节点后保存状态快照
5. 执行图:触发动态中断
initial_input = {"input": "hello world"} # 11 个字符,超过 5
thread_config = {"configurable": {"thread_id": "1"}}
# 运行图
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
print(event)
输出:
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}
分析:
- 图开始执行,状态为
{"input": "hello world"}
step_1
执行并输出日志step_2
开始执行,检测到长度超过 5,抛出NodeInterrupt
- 图立即暂停,不会执行
step_3
6. 检查中断状态
查看下一个待执行节点
state = graph.get_state(thread_config)
print(state.next)
输出:
('step_2',)
说明:
state.next
显示下一个要执行的节点是step_2
- 因为
step_2
中断了,它会重新执行(除非状态被修改)
查看中断信息
print(state.tasks)
输出:
(PregelTask(
id='6eb3910d-e231-5ba2-b25e-28ad575690bd',
name='step_2',
error=None,
interrupts=(
Interrupt(
value='Received input that is longer than 5 characters: hello world',
when='during'
),
),
state=None
),)
关键信息解读:
字段 | 值 | 说明 |
---|---|---|
name | 'step_2' | 中断发生在哪个节点 |
error | None | 这不是错误,所以为 None |
interrupts | Interrupt(...) | 中断详情 |
value | 'Received input...' | 我们传递的中断原因 |
when | 'during' | 中断发生在节点执行期间 |
when
字段说明:
'during'
:中断发生在节点内部(动态断点)'before'
:中断发生在节点之前(静态断点)
7. 尝试恢复执行(失败案例)
# 尝试继续执行
for event in graph.stream(None, thread_config, stream_mode="values"):
print(event)
输出:
{'input': 'hello world'}
再次检查状态:
state = graph.get_state(thread_config)
print(state.next)
输出:
('step_2',)
问题分析:
- 调用
graph.stream(None, ...)
会重新执行step_2
- 但状态没有改变,
input
仍然是"hello world"
- 所以再次触发
NodeInterrupt
,陷入循环!
教训: 必须修改状态才能通过中断点。
8. 修改状态并恢复执行(成功案例)
更新状态
graph.update_state(
thread_config,
{"input": "hi"}, # 修改输入为短字符串(2 个字符)
)
输出:
{
'configurable': {
'thread_id': '1',
'checkpoint_ns': '',
'checkpoint_id': '1ef6a434-06cf-6f1e-8002-0ea6dc69e075'
}
}
说明:
update_state
会修改当前状态- 返回的是新的 checkpoint ID
- 下次执行时会使用新状态
继续执行
for event in graph.stream(None, thread_config, stream_mode="values"):
print(event)
输出:
{'input': 'hi'}
---Step 2---
{'input': 'hi'}
---Step 3---
{'input': 'hi'}
成功!分析:
- 从
step_2
重新开始执行 - 此时
state['input']
是"hi"
(2 个字符) - 长度检查通过,不会抛出
NodeInterrupt
- 继续执行
step_3
,完成整个流程
🎓 核心知识点总结
LangGraph 特有概念
1. NodeInterrupt 类
语法:
from langgraph.errors import NodeInterrupt
raise NodeInterrupt(message: str)
特点:
- 可以在节点内部的任何位置抛出
- 必须传递一个字符串参数(中断原因)
- 会立即停止节点执行
- 中断信息会保存到
state.tasks[].interrupts
与静态断点的对比:
# 静态断点 - 在编译时设置
graph = builder.compile(
checkpointer=memory,
interrupt_before=["step_2"] # 总是在 step_2 之前中断
)
# 动态断点 - 在运行时触发
def step_2(state):
if some_condition(state):
raise NodeInterrupt("条件满足,需要人工审核") # 条件性中断
return state
2. 中断后的状态管理
state.next
显示下一个待执行的节点:
state = graph.get_state(thread_config)
print(state.next) # ('step_2',)
state.tasks
包含所有待执行任务的详细信息:
state.tasks[0].name # 节点名称
state.tasks[0].interrupts # 中断信息列表
state.tasks[0].error # 错误信息(如果有)
3. 状态恢复流程
1. 中断发生
↓
2. 调用 get_state() 检查状态
↓
3. 调用 update_state() 修改状态
↓
4. 调用 stream(None, ...) 继续执行
↓
5. 从中断点重新开始
关键细节:
stream(None, ...)
中的None
表示不提供新输入,使用当前状态- 如果状态未修改,会重新执行相同的逻辑(可能再次中断)
Python 特有知识点
1. TypedDict 基础
from typing_extensions import TypedDict
class State(TypedDict):
input: str
作用:
- 定义字典的结构和类型
- 提供 IDE 自动补全和类型检查
- LangGraph 使用它来验证状态
使用示例:
# 创建状态
state: State = {"input": "hello"}
# 类型检查会捕获错误
state = {"input": 123} # ❌ 错误:应该是 str
state = {"output": "hi"} # ❌ 错误:缺少 'input' 键
2. Python 异常机制
虽然 NodeInterrupt
是一个异常,但在 LangGraph 中不需要手动捕获:
# ✅ 正确用法
def step_2(state):
if condition:
raise NodeInterrupt("reason") # LangGraph 会自动处理
return state
# ❌ 不需要这样做
def step_2(state):
try:
if condition:
raise NodeInterrupt("reason")
except NodeInterrupt as e:
# 不需要捕获!LangGraph 会自动处理
pass
原理: LangGraph 的执行引擎会捕获 NodeInterrupt
,保存状态,然后优雅地暂停执行。
3. 字符串格式化(f-string)
raise NodeInterrupt(f"Received input that is longer than 5 characters: {state['input']}")
f-string 语法:
- 以
f
开头的字符串 - 使用
{}
插入变量或表达式 - 在运行时自动计算和格式化
等价写法:
# f-string (推荐)
message = f"Input: {state['input']}, Length: {len(state['input'])}"
# .format() (传统)
message = "Input: {}, Length: {}".format(state['input'], len(state['input']))
# % 格式化 (老式)
message = "Input: %s, Length: %d" % (state['input'], len(state['input']))
💡 最佳实践
1. 何时使用动态断点?
✅ 适用场景:
- 内容审核:检测敏感词、违规内容时中断
- 风险评估:风险评分超过阈值时需要人工审核
- 资源限制:输入过大、请求过多时中断
- 用户确认:需要根据 AI 的判断决定是否需要用户确认
- 调试模式:在开发时根据日志级别动态中断
❌ 不适用场景:
- 总是需要中断的节点(用静态断点更简单)
- 节点之间的流程控制(用条件边)
- 错误处理(用 try-except)
2. 动态断点的信息传递技巧
技巧 1:结构化信息
import json
def check_content(state):
risk_score = calculate_risk(state['input'])
if risk_score > 0.8:
# 传递 JSON 格式的详细信息
info = {
"reason": "高风险内容",
"risk_score": risk_score,
"detected_issues": ["暴力", "敏感词"],
"suggestion": "建议人工审核或拒绝"
}
raise NodeInterrupt(json.dumps(info, ensure_ascii=False))
return state
用户端解析:
state = graph.get_state(thread_config)
interrupt_value = state.tasks[0].interrupts[0].value
info = json.loads(interrupt_value)
print(f"风险评分: {info['risk_score']}")
print(f"建议: {info['suggestion']}")
技巧 2:多条件检查
def validate_input(state):
errors = []
# 检查 1:长度
if len(state['input']) > 1000:
errors.append("输入超过 1000 字符")
# 检查 2:格式
if not is_valid_format(state['input']):
errors.append("格式不符合要求")
# 检查 3:内容
if contains_profanity(state['input']):
errors.append("包含不当内容")
if errors:
raise NodeInterrupt(f"验证失败: {', '.join(errors)}")
return state
技巧 3:提供修复建议
def check_data(state):
data = state['data']
if missing_fields := [f for f in required_fields if f not in data]:
suggestion = f"请补充以下字段: {', '.join(missing_fields)}"
raise NodeInterrupt(f"数据不完整。{suggestion}")
return state
3. 状态修改策略
策略 1:最小修改原则
只修改必要的字段,保留其他信息:
# ✅ 好的做法 - 只修改需要的字段
graph.update_state(thread_config, {"input": "hi"})
# ❌ 避免 - 覆盖整个状态(可能丢失其他信息)
graph.update_state(thread_config, {"input": "hi", "other_field": None})
策略 2:保留历史记录
class State(TypedDict):
input: str
original_input: str # 保留原始输入
modification_history: list # 记录修改历史
def step_2(state):
if len(state['input']) > 5:
raise NodeInterrupt(f"输入过长: {state['input']}")
return state
# 修改时保留历史
state = graph.get_state(thread_config)
original = state.values['input']
graph.update_state(thread_config, {
"input": "hi",
"original_input": original,
"modification_history": [{"from": original, "to": "hi", "reason": "长度超限"}]
})
策略 3:条件性修改
# 根据中断原因决定如何修改
state = graph.get_state(thread_config)
interrupt_msg = state.tasks[0].interrupts[0].value
if "长度" in interrupt_msg:
# 截断输入
graph.update_state(thread_config, {"input": original[:5]})
elif "格式" in interrupt_msg:
# 修正格式
graph.update_state(thread_config, {"input": fix_format(original)})
4. 与 LangGraph API/Studio 集成
在实际应用中,通常会通过 API 与图交互。以下是完整的工作流程:
连接到 LangGraph 服务
from langgraph_sdk import get_client
# 连接到本地开发服务器
URL = "http://127.0.0.1:2024"
client = get_client(url=URL)
# 搜索可用的图
assistants = await client.assistants.search()
创建线程并执行
# 创建新线程
thread = await client.threads.create()
# 执行图
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="dynamic_breakpoints",
input={"input": "hello world"},
stream_mode="values"
):
print(f"Event type: {chunk.event}")
print(chunk.data)
输出示例:
Event type: metadata
{'run_id': '1ef6a43a-1b04-64d0-9a79-1caff72c8a89'}
Event type: values
{'input': 'hello world'}
Event type: values
{'input': 'hello world'}
检查中断状态
# 获取当前状态
current_state = await client.threads.get_state(thread['thread_id'])
print(current_state['next']) # ['step_2']
print(current_state['tasks'][0]['interrupts']) # 中断详情
更新状态并恢复
# 更新状态
await client.threads.update_state(
thread['thread_id'],
{"input": "hi!"}
)
# 继续执行(传入 None)
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="dynamic_breakpoints",
input=None, # ⭐ None 表示使用当前状态
stream_mode="values"
):
print(chunk.data)
输出:
{'input': 'hi!'}
---Step 2---
{'input': 'hi!'}
---Step 3---
{'input': 'hi!'}
🚀 进阶技巧
1. 多级中断条件
def advanced_check(state):
input_text = state['input']
# 第一级:严重问题 - 立即拒绝
if contains_illegal_content(input_text):
raise NodeInterrupt(
"严重违规:检测到非法内容,已自动拒绝。"
)
# 第二级:中等风险 - 需要审核
risk = calculate_risk(input_text)
if risk > 0.7:
raise NodeInterrupt(
f"中等风险 (评分: {risk}):建议人工审核。"
)
# 第三级:轻微问题 - 警告但继续
if risk > 0.4:
print(f"⚠️ 警告:检测到轻微风险 (评分: {risk})")
return state
2. 带超时的中断
from datetime import datetime, timedelta
class State(TypedDict):
input: str
interrupt_timestamp: str
max_wait_seconds: int
def timed_check(state):
if needs_approval(state['input']):
timestamp = datetime.now().isoformat()
raise NodeInterrupt(
f"需要审批。超时时间:{state.get('max_wait_seconds', 3600)} 秒"
)
return state
# 恢复时检查超时
state = graph.get_state(thread_config)
if state.tasks and state.tasks[0].interrupts:
interrupt_time = datetime.fromisoformat(state.values['interrupt_timestamp'])
if datetime.now() - interrupt_time > timedelta(seconds=state.values['max_wait_seconds']):
# 超时处理:自动拒绝或使用默认值
graph.update_state(thread_config, {"input": "TIMEOUT_REJECTED"})
3. 中断后的多路径选择
class State(TypedDict):
input: str
approval_decision: str # "approve", "reject", "modify"
def check_content(state):
if needs_review(state['input']):
raise NodeInterrupt("内容需要审核,请选择操作")
return state
def handle_decision(state):
decision = state.get('approval_decision', 'reject')
if decision == 'approve':
return {"status": "approved"}
elif decision == 'modify':
return {"input": state.get('modified_input', state['input'])}
else:
return {"status": "rejected"}
# 用户决策后更新
graph.update_state(thread_config, {
"approval_decision": "modify",
"modified_input": "修改后的内容"
})
4. 组合静态与动态断点
# 编译时设置静态断点
graph = builder.compile(
checkpointer=memory,
interrupt_before=["final_step"] # 总是在最后一步前中断
)
# 节点中使用动态断点
def middle_step(state):
if high_risk(state['data']):
raise NodeInterrupt("检测到高风险,需要额外审核") # 动态中断
return state
组合效果:
middle_step
:只在高风险时中断final_step
:总是中断(最终确认)
📊 静态 vs 动态断点对比
特性 | 静态断点 | 动态断点 |
---|---|---|
设置方式 | interrupt_before=["node"] | raise NodeInterrupt() |
中断时机 | 节点执行前 | 节点执行中 |
触发条件 | 总是触发 | 根据逻辑条件 |
信息传递 | 无 | 可以传递详细信息 |
灵活性 | 低(固定节点) | 高(动态决定) |
适用场景 | 固定审批点 | 条件性审核 |
实际应用建议:
# 静态断点 - 用于固定流程
interrupt_before=["deploy", "payment"] # 总是需要确认的步骤
# 动态断点 - 用于智能判断
def content_filter(state):
if ai_detects_issue(state['content']): # AI 判断
raise NodeInterrupt("AI 检测到潜在问题")
return state
🎯 实际应用案例
案例 1:内容审核系统
class ContentState(TypedDict):
content: str
category: str
risk_score: float
def content_moderation(state: ContentState):
# 使用 AI 模型评估内容
result = moderation_model.analyze(state['content'])
state['risk_score'] = result.risk_score
# 高风险:立即拒绝
if result.risk_score > 0.9:
raise NodeInterrupt(
f"高风险内容 (评分: {result.risk_score}),已自动拒绝。"
f"检测到的问题: {', '.join(result.issues)}"
)
# 中风险:人工审核
if result.risk_score > 0.5:
raise NodeInterrupt(
f"中风险内容 (评分: {result.risk_score}),需要人工审核。"
f"可疑点: {', '.join(result.warnings)}"
)
# 低风险:自动通过
return state
# 人工审核后的处理
def handle_review(state):
decision = state.get('review_decision')
if decision == 'approve':
return {"status": "approved"}
elif decision == 'reject':
return {"status": "rejected"}
else:
return {"status": "needs_modification"}
案例 2:金融交易审批
class TransactionState(TypedDict):
amount: float
account: str
risk_factors: list
approval_required: bool
def transaction_check(state: TransactionState):
amount = state['amount']
# 规则 1:大额交易
if amount > 10000:
raise NodeInterrupt(
f"大额交易需审批:金额 ${amount:,.2f} 超过限额 $10,000"
)
# 规则 2:异常模式
if detect_anomaly(state['account']):
risk_factors = analyze_risk(state)
raise NodeInterrupt(
f"检测到异常交易模式。风险因素: {', '.join(risk_factors)}"
)
# 规则 3:新账户
if is_new_account(state['account']):
raise NodeInterrupt(
f"新账户首次大额交易,需要身份验证"
)
return state
案例 3:AI 助手的敏感操作确认
class AssistantState(TypedDict):
user_request: str
planned_action: str
destructive: bool
def plan_execution(state: AssistantState):
action = parse_action(state['user_request'])
state['planned_action'] = action
# 检测破坏性操作
destructive_keywords = ['delete', 'remove', 'drop', 'truncate']
if any(kw in action.lower() for kw in destructive_keywords):
state['destructive'] = True
raise NodeInterrupt(
f"⚠️ 检测到潜在破坏性操作: {action}\n"
f"此操作可能无法撤销,需要您的明确确认。"
)
return state
# 用户确认后继续
graph.update_state(thread_config, {"user_confirmed": True})
案例 4:多步骤验证流程
class VerificationState(TypedDict):
user_id: str
verification_steps: list
current_step: int
def verification_flow(state: VerificationState):
steps = ['identity', 'email', 'phone', 'document']
current = state['current_step']
if current < len(steps):
step_name = steps[current]
raise NodeInterrupt(
f"验证步骤 {current + 1}/{len(steps)}: 请完成{step_name}验证"
)
return {"verification_complete": True}
# 用户完成每一步后更新
for step in range(4):
# 用户完成验证
user_completes_verification(step)
# 更新状态
graph.update_state(thread_config, {"current_step": step + 1})
# 继续执行
graph.stream(None, thread_config)
🔍 常见问题
Q1: NodeInterrupt 和普通 Python 异常有什么区别?
普通异常: 表示错误,需要捕获和处理
try:
result = risky_operation()
except ValueError as e:
handle_error(e) # 错误处理逻辑
NodeInterrupt: 表示需要暂停,由 LangGraph 自动处理
if needs_human_review(data):
raise NodeInterrupt("需要人工审核") # 不需要 try-except
关键区别:
- 普通异常会中断程序执行(除非捕获)
- NodeInterrupt 会暂停图执行,但可以恢复
- 普通异常表示"出错了",NodeInterrupt 表示"需要等待"
Q2: 为什么必须使用 MemorySaver?
因为动态断点需要:
- 保存中断时的状态
- 记录中断原因
- 支持状态恢复
没有 checkpointer,图无法保存这些信息:
# ❌ 错误 - 无法使用动态断点
graph = builder.compile() # 没有 checkpointer
# ✅ 正确
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
Q3: 能否在同一个节点中多次抛出 NodeInterrupt?
不能。一旦抛出 NodeInterrupt
,节点执行立即停止:
def check(state):
if condition_1:
raise NodeInterrupt("条件 1 触发")
# ❌ 如果条件 1 为真,这里永远不会执行
if condition_2:
raise NodeInterrupt("条件 2 触发")
解决方案: 合并条件或使用多个节点
def check(state):
errors = []
if condition_1:
errors.append("条件 1 失败")
if condition_2:
errors.append("条件 2 失败")
if errors:
raise NodeInterrupt(f"检测到问题: {', '.join(errors)}")
Q4: 如何在中断后完全取消执行,而不是继续?
有两种方法:
方法 1:更新状态为终止标志
class State(TypedDict):
input: str
cancelled: bool
def check(state):
if should_cancel(state):
raise NodeInterrupt("操作需要取消")
return state
def next_step(state):
if state.get('cancelled'):
return END # 直接结束
# 正常逻辑
return state
# 用户决定取消
graph.update_state(thread_config, {"cancelled": True})
方法 2:不调用 stream 继续执行
# 检查中断原因
state = graph.get_state(thread_config)
if "cancel" in state.tasks[0].interrupts[0].value.lower():
# 不调用 stream,执行就此结束
print("操作已取消")
else:
# 否则修改状态并继续
graph.update_state(thread_config, new_state)
graph.stream(None, thread_config)
Q5: 能否在条件边中使用动态断点?
不能直接在条件函数中抛出 NodeInterrupt
:
# ❌ 这不会工作
def route(state):
if needs_approval(state):
raise NodeInterrupt("需要审批") # 条件函数中无效
return "next_node"
builder.add_conditional_edges("start", route)
正确做法: 在节点中抛出,然后用条件边路由
def approval_node(state):
if needs_approval(state):
raise NodeInterrupt("需要审批")
return state
def route(state):
if state.get('approved'):
return "continue"
else:
return "reject"
builder.add_node("approval", approval_node)
builder.add_conditional_edges("approval", route, ["continue", "reject"])
📖 扩展阅读
🎉 总结
Dynamic Breakpoints 是 LangGraph 中强大的人机交互机制,允许图根据业务逻辑智能地决定何时需要人工介入。关键要点:
- 使用
NodeInterrupt
在节点内部触发动态中断 - 传递详细信息 告诉用户为什么会中断
- 检查状态 使用
get_state()
了解中断原因 - 修改状态 使用
update_state()
解决问题 - 继续执行 调用
stream(None, ...)
从中断点恢复
通过结合静态断点和动态断点,你可以构建灵活、智能的 AI 应用,在自动化和人工控制之间找到完美的平衡!