LangGraph 人在环 (HITL) 详细解读
概述
Human-in-the-Loop (HITL),中文译为"人在环"或"人机协同",是 AI 工作流中一种让人类参与到 AI 决策过程中的机制。简单来说,就是让 AI 在关键步骤暂停,等待人类审核、确认或提供输入后,再继续执行。
想象一下:AI 就像一个勤劳的助手,HITL 机制让你可以在任何时候喊"停",检查它在做什么,给它反馈,然后让它继续干活。
术语表
| 术语名称 | 通俗解释 | Python 语法 | 重要程度 |
|---|---|---|---|
| interrupt() | LangGraph 1.0 新增的"暂停键",在代码里调用它就能暂停图的执行 | from langgraph.types import interrupt | 核心 |
| Command | 恢复执行的"遥控器",用来告诉图继续执行并传入人类的输入 | from langgraph.types import Command | 核心 |
| resume | Command 的参数,存放人类提供的输入值 | Command(resume="your_input") | 核心 |
| Checkpointer | 状态存档器,自动保存图执行的进度,就像游戏存档一样 | InMemorySaver() 或 PostgresSaver() | 核心 |
| thread_id | 会话标识符,用于区分不同的对话/用户,类似于聊天窗口ID | {"configurable": {"thread_id": "123"}} | 核心 |
| interrupt_before | 旧版静态断点,在指定节点执行前暂停(不推荐用于 HITL) | graph.compile(interrupt_before=["node"]) | 了解 |
| interrupt_after | 旧版静态断点,在指定节点执行后暂停(不推荐用于 HITL) | graph.compile(interrupt_after=["node"]) | 了解 |
核心概念
为什么需要 HITL?
在实际应用中,完全自主的 AI Agent 存在风险:
- 安全性:AI 可能执行危险操作(如删除数据、转账)
- 准确性:AI 可能做出错误判断需要人工纠正
- 合规性:某些操作必须有人工审批记录
- 用户体验:需要在关键步骤获取用户确认或额外信息
LangGraph 1.0 的 interrupt() vs 旧版断点
| 特性 | interrupt() (推荐) | interrupt_before/after (旧版) |
|---|---|---|
| 灵活性 | 可在代码任意位置调用 | 只能在节点边界设置 |
| 传值 | 直接返回人类输入值 | 需要手动更新状态 |
| 条件控制 | 可以放在 if 语句中 | 无法条件触发 |
| 生产就绪 | 设计用于生产环境 | 主要用于调试 |
HITL 的三大应用场景
+-----------------------------------------------------------+
| Human-in-the-Loop |
+---------------+-------------------+-----------------------+
| 审批 Approve | 编辑 Edit | 输入 Input |
+---------------+-------------------+-----------------------+
| - 工具调用前确认| - 修改 AI 生成内容 | - 获取用户澄清信息 |
| - 敏感操作审批 | - 纠正错误判断 | - 多轮对话收集信息 |
| - 高风险动作拦截| - 调整参数设置 | - 动态补充上下文 |
+---------------+-------------------+-----------------------+代码实现详解
示例一:最简单的 HITL(入门级)
这是最基础的 HITL 示例,展示如何暂停并等待用户输入:
python
"""
示例1:最简单的 HITL
目标:在工作流中暂停,等待用户确认后继续
"""
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from langgraph.checkpoint.memory import InMemorySaver
# 第一步:定义状态(图需要记住什么)
class State(TypedDict):
message: str
confirmed: bool
# 第二步:定义节点函数
def ask_confirmation(state: State):
"""这个节点会暂停,等待用户确认"""
print(f"收到消息: {state['message']}")
# 关键:调用 interrupt() 暂停执行
# 括号里的字符串是提示信息,会返回给调用方
user_response = interrupt("请确认是否继续?(yes/no)")
# 用户恢复执行后,user_response 就是用户输入的值
is_confirmed = user_response.lower() == "yes"
return {"confirmed": is_confirmed}
def process_result(state: State):
"""根据用户确认结果处理"""
if state["confirmed"]:
print("用户已确认,继续执行...")
return {"message": "操作已完成!"}
else:
print("用户取消,停止执行...")
return {"message": "操作已取消"}
# 第三步:构建图
builder = StateGraph(State)
builder.add_node("ask", ask_confirmation)
builder.add_node("process", process_result)
builder.add_edge(START, "ask")
builder.add_edge("ask", "process")
builder.add_edge("process", END)
# 关键:必须有 checkpointer 才能使用 interrupt
memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)
# 第四步:执行图
config = {"configurable": {"thread_id": "user-123"}}
# 首次运行 - 会在 interrupt() 处暂停
print("=== 开始执行 ===")
for event in graph.stream({"message": "Hello World"}, config):
print(event)
print("\n=== 图已暂停,等待用户输入 ===\n")
# 模拟用户输入 "yes",使用 Command 恢复执行
from langgraph.types import Command
print("=== 用户输入 yes,恢复执行 ===")
for event in graph.stream(Command(resume="yes"), config):
print(event)运行结果:
=== 开始执行 ===
收到消息: Hello World
{'__interrupt__': (Interrupt(value='请确认是否继续?(yes/no)'),)}
=== 图已暂停,等待用户输入 ===
=== 用户输入 yes,恢复执行 ===
用户已确认,继续执行...
{'process': {'message': '操作已完成!'}}示例二:审批/拒绝模式(中级)
这个示例展示如何根据用户的审批结果,路由到不同的处理分支:
python
"""
示例2:审批/拒绝模式
目标:用户可以批准或拒绝操作,图根据结果走不同分支
"""
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
class WorkflowState(TypedDict):
task: str
user_decision: str
status: str
def get_approval(state: WorkflowState):
"""暂停并等待用户审批"""
print(f"待审批任务: {state['task']}")
# 暂停等待用户输入
decision = interrupt({
"prompt": "请审批此任务",
"task": state["task"],
"options": ["approve", "reject"]
})
print(f"用户决定: {decision}")
return {"user_decision": decision}
def router(state: WorkflowState) -> Command:
"""根据用户决定路由到不同节点"""
decision = state.get("user_decision", "").strip().lower()
if decision == "approve":
return Command(goto="complete_task")
else:
return Command(goto="cancel_task")
def complete_task(state: WorkflowState):
"""批准后执行任务"""
print("任务已批准并完成!")
return {"status": "completed"}
def cancel_task(state: WorkflowState):
"""拒绝后取消任务"""
print("任务已被拒绝并取消")
return {"status": "cancelled"}
# 构建图
builder = StateGraph(WorkflowState)
builder.add_node("get_approval", get_approval)
builder.add_node("router", router)
builder.add_node("complete_task", complete_task)
builder.add_node("cancel_task", cancel_task)
builder.add_edge(START, "get_approval")
builder.add_edge("get_approval", "router")
builder.add_edge("complete_task", END)
builder.add_edge("cancel_task", END)
memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)
# 测试场景1:批准
print("\n" + "="*50)
print("场景1:用户批准任务")
print("="*50)
config1 = {"configurable": {"thread_id": "test-1"}}
initial_input = {"task": "部署生产环境新功能"}
# 运行到中断点
for event in graph.stream(initial_input, config1, stream_mode="values"):
print(f"状态: {event}")
# 用户批准
print("\n--- 用户输入: approve ---\n")
for event in graph.stream(Command(resume="approve"), config1, stream_mode="values"):
print(f"状态: {event}")
# 测试场景2:拒绝
print("\n" + "="*50)
print("场景2:用户拒绝任务")
print("="*50)
config2 = {"configurable": {"thread_id": "test-2"}}
for event in graph.stream(initial_input, config2, stream_mode="values"):
print(f"状态: {event}")
print("\n--- 用户输入: reject ---\n")
for event in graph.stream(Command(resume="reject"), config2, stream_mode="values"):
print(f"状态: {event}")示例三:输入验证循环(中高级)
有时候用户的输入需要验证,无效输入需要重新请求。这个示例展示如何使用循环实现输入验证:
python
"""
示例3:输入验证循环
目标:持续请求用户输入,直到输入有效为止
"""
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
class FormState(TypedDict):
name: str
age: int
email: str
def collect_age(state: FormState):
"""收集并验证年龄输入"""
prompt = "请输入您的年龄(正整数):"
while True:
# 每次循环都会暂停等待输入
answer = interrupt(prompt)
# 验证输入
try:
age = int(answer)
if age > 0 and age < 150:
print(f"年龄验证通过: {age}")
return {"age": age}
else:
prompt = f"'{answer}' 不是有效年龄,请输入1-150之间的数字:"
except ValueError:
prompt = f"'{answer}' 不是数字,请输入有效的年龄:"
def collect_email(state: FormState):
"""收集并验证邮箱输入"""
prompt = "请输入您的邮箱地址:"
while True:
answer = interrupt(prompt)
# 简单的邮箱验证
if "@" in answer and "." in answer:
print(f"邮箱验证通过: {answer}")
return {"email": answer}
else:
prompt = f"'{answer}' 不是有效邮箱,请重新输入(需包含@和.):"
def show_summary(state: FormState):
"""显示收集结果"""
print("\n" + "="*40)
print("信息收集完成!")
print(f" 姓名: {state['name']}")
print(f" 年龄: {state['age']}")
print(f" 邮箱: {state['email']}")
print("="*40)
return state
# 构建图
builder = StateGraph(FormState)
builder.add_node("collect_age", collect_age)
builder.add_node("collect_email", collect_email)
builder.add_node("show_summary", show_summary)
builder.add_edge(START, "collect_age")
builder.add_edge("collect_age", "collect_email")
builder.add_edge("collect_email", "show_summary")
builder.add_edge("show_summary", END)
memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)
# 执行示例
config = {"configurable": {"thread_id": "form-123"}}
initial = {"name": "张三", "age": 0, "email": ""}
print("=== 开始收集信息 ===\n")
# 第一次运行,会在第一个 interrupt 处暂停
for event in graph.stream(initial, config, stream_mode="values"):
if "__interrupt__" in str(event):
print(f"系统提示: {event}")
# 模拟用户输入无效年龄
print("\n用户输入: 'abc' (无效)")
for event in graph.stream(Command(resume="abc"), config, stream_mode="values"):
if "__interrupt__" in str(event):
print(f"系统提示: {event}")
# 再次输入无效年龄
print("\n用户输入: '-5' (无效)")
for event in graph.stream(Command(resume="-5"), config, stream_mode="values"):
if "__interrupt__" in str(event):
print(f"系统提示: {event}")
# 输入有效年龄,进入下一步
print("\n用户输入: '25' (有效)")
for event in graph.stream(Command(resume="25"), config, stream_mode="values"):
if "__interrupt__" in str(event):
print(f"系统提示: {event}")
# 输入无效邮箱
print("\n用户输入: 'invalid' (无效邮箱)")
for event in graph.stream(Command(resume="invalid"), config, stream_mode="values"):
if "__interrupt__" in str(event):
print(f"系统提示: {event}")
# 输入有效邮箱
print("\n用户输入: 'zhangsan@example.com' (有效)")
for event in graph.stream(Command(resume="zhangsan@example.com"), config, stream_mode="values"):
print(f"最终状态: {event}")示例四:工具调用审批(高级)
这是最常见的生产场景:AI Agent 决定调用某个工具前,需要人工审批:
python
"""
示例4:工具调用审批
目标:AI Agent 在调用工具前需要人工审批
"""
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
# 定义工具
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""发送邮件给指定收件人"""
# 实际应用中这里会调用邮件服务
return f"邮件已发送给 {to},主题: {subject}"
@tool
def delete_file(filename: str) -> str:
"""删除指定文件(危险操作)"""
return f"文件 {filename} 已删除"
@tool
def get_weather(city: str) -> str:
"""获取城市天气(安全操作)"""
return f"{city}今天天气晴朗,温度25度C"
# 定义哪些工具需要审批
TOOLS_REQUIRING_APPROVAL = {"send_email", "delete_file"}
# 状态定义
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
# 创建带审批功能的工具节点
def tool_node_with_approval(state: AgentState):
"""执行工具调用,敏感工具需要审批"""
last_message = state["messages"][-1]
results = []
for tool_call in last_message.tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
# 检查是否需要审批
if tool_name in TOOLS_REQUIRING_APPROVAL:
print(f"\n敏感操作需要审批: {tool_name}")
print(f" 参数: {tool_args}")
# 暂停等待审批
approval = interrupt({
"type": "tool_approval",
"tool": tool_name,
"args": tool_args,
"message": f"是否允许执行 {tool_name}?(approve/reject)"
})
if approval.lower() != "approve":
results.append({
"tool_call_id": tool_call["id"],
"content": f"操作被用户拒绝: {tool_name}"
})
continue
print(f"用户已批准 {tool_name}")
# 执行工具
tool_map = {
"send_email": send_email,
"delete_file": delete_file,
"get_weather": get_weather
}
result = tool_map[tool_name].invoke(tool_args)
results.append({
"tool_call_id": tool_call["id"],
"content": result
})
from langchain_core.messages import ToolMessage
return {"messages": [ToolMessage(**r) for r in results]}
# 创建 Agent
tools = [send_email, delete_file, get_weather]
llm = ChatOpenAI(model="gpt-4").bind_tools(tools)
def agent(state: AgentState):
"""Agent 节点:调用 LLM 决定下一步"""
response = llm.invoke(state["messages"])
return {"messages": [response]}
# 构建图
builder = StateGraph(AgentState)
builder.add_node("agent", agent)
builder.add_node("tools", tool_node_with_approval)
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", tools_condition)
builder.add_edge("tools", "agent")
memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)
# 测试
config = {"configurable": {"thread_id": "agent-test"}}
print("=== 测试1:查询天气(不需要审批)===")
for event in graph.stream(
{"messages": [("user", "北京今天天气怎么样?")]},
config,
stream_mode="values"
):
if event["messages"]:
print(event["messages"][-1])
print("\n=== 测试2:发送邮件(需要审批)===")
config2 = {"configurable": {"thread_id": "agent-test-2"}}
for event in graph.stream(
{"messages": [("user", "帮我发一封邮件给 boss@company.com,主题是请假申请")]},
config2,
stream_mode="values"
):
if "__interrupt__" in str(event):
print(f"\n等待审批...")
break
if event["messages"]:
print(event["messages"][-1])
# 用户审批
print("\n用户输入: approve")
for event in graph.stream(Command(resume="approve"), config2, stream_mode="values"):
if event["messages"]:
print(event["messages"][-1])示例五:多步骤工作流 + 状态编辑(高级)
这个示例展示如何在暂停时让用户编辑图的状态:
python
"""
示例5:多步骤工作流 + 状态编辑
目标:用户可以在中断时查看并修改 Agent 的工作状态
"""
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
class DocumentState(TypedDict):
title: str
content: str
summary: str
approved: bool
def generate_summary(state: DocumentState):
"""AI 生成摘要"""
# 模拟 AI 生成摘要
summary = f"这是关于'{state['title']}'的摘要:{state['content'][:50]}..."
print(f"AI 生成摘要: {summary}")
return {"summary": summary}
def review_summary(state: DocumentState):
"""人工审核摘要,可以编辑"""
print("\n" + "="*50)
print("请审核 AI 生成的摘要:")
print(f" 标题: {state['title']}")
print(f" 摘要: {state['summary']}")
print("="*50)
# 暂停等待用户反馈
feedback = interrupt({
"type": "review",
"current_summary": state["summary"],
"options": [
"approve - 批准当前摘要",
"edit:新摘要内容 - 修改摘要",
"reject - 拒绝"
]
})
feedback = feedback.strip()
if feedback.lower() == "approve":
print("摘要已批准")
return {"approved": True}
elif feedback.lower().startswith("edit:"):
new_summary = feedback[5:].strip()
print(f"摘要已修改为: {new_summary}")
return {"summary": new_summary, "approved": True}
else:
print("摘要被拒绝")
return {"approved": False}
def finalize(state: DocumentState):
"""最终处理"""
if state["approved"]:
print(f"\n文档处理完成!最终摘要: {state['summary']}")
else:
print("\n文档处理未完成,摘要被拒绝")
return state
# 构建图
builder = StateGraph(DocumentState)
builder.add_node("generate", generate_summary)
builder.add_node("review", review_summary)
builder.add_node("finalize", finalize)
builder.add_edge(START, "generate")
builder.add_edge("generate", "review")
builder.add_edge("review", "finalize")
builder.add_edge("finalize", END)
memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)
# 测试场景:用户编辑摘要
config = {"configurable": {"thread_id": "doc-1"}}
print("=== 开始文档处理 ===\n")
for event in graph.stream({
"title": "LangGraph 入门指南",
"content": "LangGraph 是一个用于构建有状态、多角色应用的框架...",
"summary": "",
"approved": False
}, config, stream_mode="values"):
pass
# 用户选择编辑
print("\n用户输入: edit:LangGraph 是构建 AI Agent 的强大框架")
for event in graph.stream(
Command(resume="edit:LangGraph 是构建 AI Agent 的强大框架"),
config,
stream_mode="values"
):
pass完整案例代码(可直接运行)
以下是一个完整的、可以直接在 Jupyter Notebook 中运行的代码示例,演示 LangGraph 1.0 HITL 的核心功能:
python
# ============================================================
# LangGraph 1.0 Human-in-the-Loop (HITL) 完整示例
# 演示:interrupt()、Command、InMemorySaver 协同工作
# ============================================================
# --------------------------
# 1. 导入必要的库
# --------------------------
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
from IPython.display import Image, display
# --------------------------
# 2. 定义状态
# --------------------------
class TaskState(TypedDict):
task_name: str
task_params: dict
user_decision: str
result: str
# --------------------------
# 3. 定义节点
# --------------------------
def prepare_task(state: TaskState):
"""准备任务,展示将要执行的操作"""
print(f"\n📋 准备执行任务: {state['task_name']}")
print(f" 参数: {state['task_params']}")
return state
def get_user_approval(state: TaskState):
"""暂停等待用户审批"""
print("\n⏸️ 需要用户审批...")
# 关键:使用 interrupt() 暂停执行
decision = interrupt({
"type": "approval_request",
"task": state["task_name"],
"params": state["task_params"],
"prompt": "请选择: approve (批准) / reject (拒绝) / modify:新参数 (修改)"
})
print(f"✅ 收到用户反馈: {decision}")
return {"user_decision": decision}
def process_decision(state: TaskState) -> Command:
"""根据用户决定路由到不同分支"""
decision = state["user_decision"].strip().lower()
if decision == "approve":
return Command(goto="execute_task")
elif decision.startswith("modify:"):
# 提取修改后的参数
new_param = decision[7:].strip()
return Command(goto="execute_task", update={"task_params": {"value": new_param}})
else:
return Command(goto="cancel_task")
def execute_task(state: TaskState):
"""执行任务"""
print(f"\n🚀 执行任务: {state['task_name']}")
print(f" 使用参数: {state['task_params']}")
result = f"任务 '{state['task_name']}' 执行成功,参数: {state['task_params']}"
print(f" 结果: {result}")
return {"result": result}
def cancel_task(state: TaskState):
"""取消任务"""
print(f"\n❌ 任务已取消: {state['task_name']}")
return {"result": "任务已取消"}
# --------------------------
# 4. 构建图
# --------------------------
builder = StateGraph(TaskState)
builder.add_node("prepare", prepare_task)
builder.add_node("approval", get_user_approval)
builder.add_node("router", process_decision)
builder.add_node("execute_task", execute_task)
builder.add_node("cancel_task", cancel_task)
builder.add_edge(START, "prepare")
builder.add_edge("prepare", "approval")
builder.add_edge("approval", "router")
builder.add_edge("execute_task", END)
builder.add_edge("cancel_task", END)
# 关键:必须有 Checkpointer 才能使用 interrupt
memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)
# --------------------------
# 5. 可视化图结构
# --------------------------
print("📊 图结构可视化:")
display(Image(graph.get_graph().draw_mermaid_png()))
# --------------------------
# 6. 场景 1:用户批准
# --------------------------
print("\n" + "=" * 60)
print("📗 场景 1:用户批准任务")
print("=" * 60)
config1 = {"configurable": {"thread_id": "hitl-demo-1"}}
initial_input = {
"task_name": "发送邮件",
"task_params": {"to": "user@example.com", "subject": "测试"},
"user_decision": "",
"result": ""
}
# 执行到 interrupt 处暂停
print("\n▶️ 开始执行(会在审批处暂停)...")
for event in graph.stream(initial_input, config1, stream_mode="values"):
pass
# 检查暂停状态
state1 = graph.get_state(config1)
print(f"\n📍 当前暂停于: {state1.next}")
if state1.tasks and state1.tasks[0].interrupts:
interrupt_info = state1.tasks[0].interrupts[0]
print(f"📌 中断信息: {interrupt_info.value}")
# 用户批准,使用 Command(resume=) 恢复执行
print("\n👤 用户输入: approve")
for event in graph.stream(Command(resume="approve"), config1, stream_mode="values"):
pass
print(f"\n📊 最终结果: {graph.get_state(config1).values['result']}")
# --------------------------
# 7. 场景 2:用户拒绝
# --------------------------
print("\n" + "=" * 60)
print("📕 场景 2:用户拒绝任务")
print("=" * 60)
config2 = {"configurable": {"thread_id": "hitl-demo-2"}}
for event in graph.stream(initial_input, config2, stream_mode="values"):
pass
print("\n👤 用户输入: reject")
for event in graph.stream(Command(resume="reject"), config2, stream_mode="values"):
pass
print(f"\n📊 最终结果: {graph.get_state(config2).values['result']}")
# --------------------------
# 8. 场景 3:用户修改参数
# --------------------------
print("\n" + "=" * 60)
print("📘 场景 3:用户修改参数后执行")
print("=" * 60)
config3 = {"configurable": {"thread_id": "hitl-demo-3"}}
for event in graph.stream(initial_input, config3, stream_mode="values"):
pass
print("\n👤 用户输入: modify:新的收件人")
for event in graph.stream(Command(resume="modify:新的收件人"), config3, stream_mode="values"):
pass
final_state = graph.get_state(config3).values
print(f"\n📊 最终结果: {final_state['result']}")
print(f"📊 修改后参数: {final_state['task_params']}")
# --------------------------
# 9. 演示:输入验证循环
# --------------------------
print("\n" + "=" * 60)
print("🔄 演示:输入验证循环(多次 interrupt)")
print("=" * 60)
class ValidationState(TypedDict):
name: str
age: int
def collect_age(state: ValidationState):
"""收集年龄,验证直到有效"""
prompt = "请输入您的年龄(1-150的整数):"
while True:
answer = interrupt(prompt)
try:
age = int(answer)
if 1 <= age <= 150:
print(f"✅ 年龄验证通过: {age}")
return {"age": age}
else:
prompt = f"'{answer}' 不在有效范围(1-150),请重新输入:"
except ValueError:
prompt = f"'{answer}' 不是数字,请输入有效年龄:"
def show_result(state: ValidationState):
"""显示结果"""
print(f"\n📊 收集完成!{state['name']} 的年龄是 {state['age']} 岁")
return state
# 构建验证图
val_builder = StateGraph(ValidationState)
val_builder.add_node("collect_age", collect_age)
val_builder.add_node("show_result", show_result)
val_builder.add_edge(START, "collect_age")
val_builder.add_edge("collect_age", "show_result")
val_builder.add_edge("show_result", END)
val_memory = InMemorySaver()
val_graph = val_builder.compile(checkpointer=val_memory)
config4 = {"configurable": {"thread_id": "validation-demo"}}
# 首次执行
print("\n▶️ 开始收集年龄...")
for event in val_graph.stream({"name": "张三", "age": 0}, config4, stream_mode="values"):
pass
# 输入无效值
print("\n👤 用户输入: 'abc' (无效)")
for event in val_graph.stream(Command(resume="abc"), config4, stream_mode="values"):
pass
# 再次输入无效值
print("\n👤 用户输入: '-5' (无效)")
for event in val_graph.stream(Command(resume="-5"), config4, stream_mode="values"):
pass
# 输入有效值
print("\n👤 用户输入: '25' (有效)")
for event in val_graph.stream(Command(resume="25"), config4, stream_mode="values"):
pass
print("\n✨ HITL 演示完成!")运行结果示例:
📊 图结构可视化:
[显示图:START → prepare → approval → router → execute_task/cancel_task → END]
============================================================
📗 场景 1:用户批准任务
============================================================
▶️ 开始执行(会在审批处暂停)...
📋 准备执行任务: 发送邮件
参数: {'to': 'user@example.com', 'subject': '测试'}
⏸️ 需要用户审批...
📍 当前暂停于: ('approval',)
📌 中断信息: {'type': 'approval_request', 'task': '发送邮件', ...}
👤 用户输入: approve
✅ 收到用户反馈: approve
🚀 执行任务: 发送邮件
使用参数: {'to': 'user@example.com', 'subject': '测试'}
结果: 任务 '发送邮件' 执行成功...
📊 最终结果: 任务 '发送邮件' 执行成功...
============================================================
📕 场景 2:用户拒绝任务
============================================================
...
📊 最终结果: 任务已取消
============================================================
🔄 演示:输入验证循环(多次 interrupt)
============================================================
👤 用户输入: 'abc' (无效)
[系统等待新输入...]
👤 用户输入: '25' (有效)
✅ 年龄验证通过: 25
📊 收集完成!张三 的年龄是 25 岁代码要点说明:
| 要点 | 说明 |
|---|---|
interrupt(payload) | 暂停执行,payload 返回给调用方(必须可 JSON 序列化) |
Command(resume=value) | 恢复执行,将 value 作为 interrupt 的返回值 |
Command(goto="node") | 路由到指定节点 |
Command(goto="node", update={...}) | 路由并同时更新状态 |
InMemorySaver() | 内存 Checkpointer(开发用) |
thread_id | 区分不同会话/用户 |
HITL 三大模式:
| 模式 | 说明 | 实现 |
|---|---|---|
| 审批 (Approve) | 用户确认后才执行 | interrupt() → 用户输入 → Command(resume=) |
| 编辑 (Edit) | 用户修改 AI 结果 | interrupt() → 用户修改 → Command(update={...}) |
| 输入 (Input) | 收集用户信息 | while True: interrupt() 循环验证 |
生产环境最佳实践
1. 使用持久化 Checkpointer
开发时使用 InMemorySaver,生产环境使用持久化存储:
python
# 开发环境
from langgraph.checkpoint.memory import InMemorySaver
memory = InMemorySaver()
# 生产环境(推荐 PostgreSQL)
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
async with AsyncPostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db"
) as checkpointer:
graph = builder.compile(checkpointer=checkpointer)2. 避免常见错误
python
# 错误:不要用 try-except 包裹 interrupt
def bad_node(state):
try:
result = interrupt("prompt") # interrupt 使用异常机制!
except:
pass # 这会吞掉 interrupt 异常
# 正确:直接调用
def good_node(state):
result = interrupt("prompt")
return {"data": result}
# 错误:interrupt 前的副作用不是幂等的
def bad_side_effect(state):
send_notification("开始处理") # 恢复时会重复发送!
result = interrupt("confirm?")
return {"result": result}
# 正确:确保副作用幂等或放在 interrupt 之后
def good_side_effect(state):
result = interrupt("confirm?")
send_notification("处理完成") # 在 interrupt 之后
return {"result": result}3. interrupt 的 payload 必须可 JSON 序列化
python
# 错误:传递函数或复杂对象
result = interrupt(lambda x: x) # 函数不能 JSON 序列化
# 正确:使用字典、列表、字符串等基本类型
result = interrupt({
"type": "approval",
"message": "请确认",
"options": ["yes", "no"]
})参考资料
官方文档
- LangGraph Interrupts 官方文档
- How to wait for user input using interrupt
- LangChain 博客:Making it easier to build human-in-the-loop agents with interrupt
教程与指南
- Human-in-the-Loop (HITL) with LangGraph: A Practical Guide | Towards AI
- Human-in-the-Loop with LangGraph: A Beginner's Guide | Medium
- LangGraph Human-in-the-loop (HITL) Deployment with FastAPI | Medium
- Interrupts and Commands in LangGraph | DEV Community
- IBM Tutorial: Human in the loop
中文资源
设计模式
- LangGraph HITL Design Patterns: Approve and Reject | Medium
- Human-in-the-Loop Agent Using Interrupt and Command | Medium
小结
| 概念 | 一句话总结 |
|---|---|
| HITL | 让人类参与 AI 决策的机制 |
| interrupt() | LangGraph 1.0 的"暂停键",随时暂停等待输入 |
| Command(resume=) | 恢复执行并传入人类输入的"遥控器" |
| Checkpointer | 保存执行进度的"存档器",必须配置 |
| thread_id | 区分不同会话的唯一标识 |
核心心智模型:把 interrupt() 想象成游戏中的暂停键,Command(resume=) 是继续键,Checkpointer 是自动存档功能。有了这三个,你的 AI Agent 就可以在任何时候暂停、等待人类指示、然后继续执行了!