Skip to content

人机协作与时间回溯

在构建 AI 应用时,有两个场景非常常见:

  1. 让人类参与决策:AI 给出建议,但关键决定由人来做
  2. 回到过去重来:不满意当前结果,回到之前的状态重新尝试

LangGraph 1.x 提供了两个强大的特性来支持这些场景:Human-in-the-Loop (HITL)Time Travel


什么是 Human-in-the-Loop?

通俗解释

想象你在使用一个 AI 翻译工具:

  • 没有 HITL:AI 翻译完就直接发送,你没有机会检查
  • 有 HITL:AI 翻译完后会暂停,问你"确认发送吗?",你说"好"才继续

这就是 Human-in-the-Loop —— 让人类在关键节点介入决策

为什么需要 HITL?

场景没有 HITL 的风险有 HITL 的保障
AI 客服自动发送不当回复敏感回复需人工确认
自动交易执行错误的买卖指令大额交易需人工批准
内容审核误删或漏删内容边界案例由人工判断
代码生成直接执行危险代码用户确认后才执行

LangGraph 的 HITL 实现

LangGraph 1.x 使用 interrupt()Command 来实现 HITL:

python
from langgraph.types import interrupt, Command

def review_node(state):
    # 暂停执行,等待人工决策
    decision = interrupt({
        "question": "是否批准这个操作?",
        "data": state["pending_action"]
    })

    if decision:
        return Command(goto="execute")
    else:
        return Command(goto="cancel")

工作原理

  1. interrupt()暂停 Graph 执行
  2. 返回一个包含 __interrupt__ 的结果给调用者
  3. 调用者收集人工决策后,用 Command(resume=value) 恢复执行
  4. interrupt() 返回 resume 的值,继续执行后续逻辑

HITL 实现的关键要点

在实际项目中实现人机协作时,需要注意以下几点:

要点说明
必须配置 Checkpointer否则无法保存中断时的任务状态
必须指定 thread_id用于后续恢复任务时找到正确的会话
中断时间有限制中断时间过长可能导致无法恢复(取决于 Checkpointer 配置)
resume 值可以是任意类型可以是 True/False,也可以是字典等复杂数据

典型流程图

                    ┌─────────────┐
                    │   调用 LLM   │
                    └──────┬──────┘


                    ┌─────────────┐
                    │  人工审核点  │
                    │ interrupt() │
                    └──────┬──────┘

              ┌────────────┼────────────┐
              │            │            │
              ▼            │            ▼
        ┌─────────┐        │      ┌─────────┐
        │  批准   │        │      │  拒绝   │
        │ goto=A  │        │      │ goto=B  │
        └────┬────┘        │      └────┬────┘
             │             │           │
             ▼             │           ▼
      [继续执行...]        │     [终止或其他处理]

案例一:智能翻译审核系统

场景描述

构建一个翻译系统,普通内容直接输出,但涉及法律、医疗等敏感内容时,需要人工审核。

工作流程

用户输入 → 翻译 → 检测敏感词

            是敏感内容?
           /          \
         是            否
          ↓             ↓
     [人工审核]     直接输出
      /      \
   通过      拒绝
     ↓         ↓
   输出      终止

核心代码解析

1. 定义状态

python
from typing import TypedDict

class TranslationState(TypedDict):
    source_text: str       # 原文
    source_lang: str       # 源语言
    target_lang: str       # 目标语言
    translated_text: str   # 翻译结果
    is_sensitive: bool     # 是否敏感
    final_output: str      # 最终输出

2. 翻译节点

python
def translate_text(state: TranslationState):
    # 调用 LLM 翻译
    response = llm.invoke(f"翻译: {state['source_text']}")

    # 检测敏感词
    sensitive_words = ["法律", "医疗", "合同"]
    is_sensitive = any(w in state['source_text'] for w in sensitive_words)

    return {
        "translated_text": response.content,
        "is_sensitive": is_sensitive
    }

3. 人工审核节点(HITL 核心)

python
from langgraph.types import interrupt, Command
from typing import Literal

def human_review(state: TranslationState) -> Command[Literal["output", "__end__"]]:
    # 使用 interrupt 暂停,等待人工决策
    approved = interrupt({
        "message": "请审核翻译结果",
        "original": state["source_text"],
        "translated": state["translated_text"]
    })

    if approved:
        return Command(goto="output")
    else:
        return Command(goto=END, update={"final_output": "翻译已被拒绝"})

小白提示

  • interrupt() 就像按下"暂停键",Graph 会停在这里
  • 返回值 approved 来自后续的 Command(resume=True/False)
  • Command(goto=...) 决定下一步去哪个节点

4. 构建 Graph

python
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import InMemorySaver

builder = StateGraph(TranslationState)

builder.add_node("translate", translate_text)
builder.add_node("human_review", human_review)
builder.add_node("output", output_result)

builder.add_edge(START, "translate")
builder.add_conditional_edges("translate", should_review)
builder.add_edge("output", END)

# 必须使用 checkpointer 才能支持 interrupt
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

5. 使用示例

python
config = {"configurable": {"thread_id": "translation-001"}}

# 第一步:提交翻译(敏感内容会暂停)
result = graph.invoke({
    "source_text": "根据合同法律条款...",
    "source_lang": "中文",
    "target_lang": "英文"
}, config=config)

print(result)  # 包含 __interrupt__,说明在等待审核

# 第二步:人工批准
final_result = graph.invoke(Command(resume=True), config=config)
print(final_result["final_output"])  # 输出翻译结果

# 或者拒绝
# final_result = graph.invoke(Command(resume=False), config=config)

什么是 Time Travel?

通俗解释

想象你在玩一个剧情游戏:

  • 没有 Time Travel:选错了对话选项,只能重新开始
  • 有 Time Travel:可以"读档",回到之前的存档点重新选择

LangGraph 的 Time Travel 就是这个"存档读档"功能 —— 可以回溯到任意历史状态,从那里重新执行

为什么需要 Time Travel?

场景价值
调试发现问题后,回到问题发生前的状态分析原因
探索从同一起点尝试不同的路径,对比结果
A/B 测试保持相同的前序状态,测试不同的后续处理
容错某一步出错后,回退到上一步重试

LangGraph 的 Time Travel 实现

LangGraph 通过 Checkpointer 自动保存每一步的状态快照:

python
# 获取所有历史状态
states = list(graph.get_state_history(config))

for state in states:
    print(f"Checkpoint: {state.config['configurable']['checkpoint_id']}")
    print(f"Next node: {state.next}")
    print(f"Values: {state.values}")

工作原理

  1. 每次节点执行后,Checkpointer 自动保存状态快照
  2. 每个快照有唯一的 checkpoint_id
  3. 指定 checkpoint_id 可以从该快照恢复执行

Time Travel 实现的三个关键步骤

步骤一:执行并记录

python
import uuid

# 生成唯一的会话 ID
config = {"configurable": {"thread_id": str(uuid.uuid4())}}

# 正常执行 Graph
result = graph.invoke(initial_input, config)

要点:每次执行后,每个节点完成时都会自动生成一个 checkpoint。

步骤二:查看历史检查点

python
# 获取所有 checkpoint
states = list(graph.get_state_history(config))

for i, state in enumerate(states):
    print(f"--- Checkpoint {i} ---")
    print(f"下一步: {state.next}")           # 该检查点之后要执行的节点
    print(f"ID: {state.config['configurable']['checkpoint_id']}")
    print(f"状态值: {state.values}")

输出示例

--- Checkpoint 0 ---
下一步: ()                    # 空元组表示已完成
ID: abc-123

--- Checkpoint 1 ---
下一步: ('ending',)           # 下一步是 ending 节点
ID: abc-122

--- Checkpoint 2 ---
下一步: ('development',)      # 下一步是 development 节点
ID: abc-121

步骤三:回溯并重新执行

python
# 选择要回溯的检查点
target_checkpoint = states[2]  # 假设我们要从 development 重新开始

# 可选:修改状态再重新执行
new_config = graph.update_state(
    target_checkpoint.config,
    values={"author": "鲁迅"}  # 修改状态中的某个值
)

# 从该检查点继续执行(传入 None 表示使用检查点的状态)
new_result = graph.invoke(None, new_config)

update_state 的妙用

update_state 方法可以在回溯前修改状态,这在以下场景非常有用:

场景做法
纠正错误发现某步骤的输出有误,回溯后修正
A/B 测试保持前序状态,测试不同的后续参数
手动干预人工修改中间结果,然后继续执行
调试注入特定值来测试边界条件

案例二:故事创作助手

场景描述

构建一个故事创作助手,生成"开头 → 发展 → 结局"。如果对结局不满意,可以"时间回溯"到开头,重新生成不同的发展和结局。

工作流程

主题输入 → 生成开头 → 生成发展 → 生成结局
              ↑______________|
              (Time Travel 回溯)

核心代码解析

1. 定义状态

python
from typing import TypedDict
from typing_extensions import NotRequired

class StoryState(TypedDict):
    theme: str                    # 故事主题
    opening: NotRequired[str]     # 开头
    development: NotRequired[str] # 发展
    ending: NotRequired[str]      # 结局

小白提示NotRequired 表示这个字段是可选的。故事刚开始时只有 theme,其他字段会逐步填充。

2. 故事节点

python
def generate_opening(state: StoryState):
    prompt = f"根据主题「{state['theme']}」写一个故事开头,50字以内"
    response = llm.invoke(prompt)
    return {"opening": response.content}

def generate_development(state: StoryState):
    prompt = f"故事开头:{state['opening']}\n请续写发展,80字以内"
    response = llm.invoke(prompt)
    return {"development": response.content}

def generate_ending(state: StoryState):
    prompt = f"开头:{state['opening']}\n发展:{state['development']}\n写结局"
    response = llm.invoke(prompt)
    return {"ending": response.content}

3. 构建 Graph

python
builder = StateGraph(StoryState)

builder.add_node("opening", generate_opening)
builder.add_node("development", generate_development)
builder.add_node("ending", generate_ending)

builder.add_edge(START, "opening")
builder.add_edge("opening", "development")
builder.add_edge("development", "ending")
builder.add_edge("ending", END)

# Time Travel 必须使用 checkpointer
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

4. 第一次创作

python
import uuid

config = {"configurable": {"thread_id": str(uuid.uuid4())}}

story = graph.invoke({"theme": "程序员发现时间旅行秘密"}, config=config)

print(f"开头: {story['opening']}")
print(f"发展: {story['development']}")
print(f"结局: {story['ending']}")

5. 查看 Checkpoints

python
states = list(graph.get_state_history(config))

for i, state in enumerate(states):
    print(f"--- Checkpoint {i} ---")
    print(f"Next: {state.next}")
    print(f"ID: {state.config['configurable']['checkpoint_id']}")

输出示例:

--- Checkpoint 0 ---
Next: ()                      # 已完成
ID: abc-123

--- Checkpoint 1 ---
Next: ('ending',)             # 下一步是 ending
ID: abc-122

--- Checkpoint 2 ---
Next: ('development',)        # 下一步是 development  ← 我们要回到这里
ID: abc-121

--- Checkpoint 3 ---
Next: ('opening',)
ID: abc-120

6. Time Travel:重新生成

python
# 找到开头完成后的 checkpoint
target_state = None
for state in states:
    if state.next == ('development',):
        target_state = state
        break

# 创建回溯配置
time_travel_config = {
    "configurable": {
        "thread_id": target_state.config["configurable"]["thread_id"],
        "checkpoint_id": target_state.config["configurable"]["checkpoint_id"],
    }
}

# 从该 checkpoint 重新执行
new_story = graph.invoke(None, config=time_travel_config)

print(f"保留的开头: {new_story['opening']}")  # 和之前一样
print(f"新的发展: {new_story['development']}")  # 重新生成
print(f"新的结局: {new_story['ending']}")       # 重新生成

关键点

  • invoke(None, config) 表示"从指定的 checkpoint 继续执行"
  • 开头保持不变(因为是从开头完成后的状态开始)
  • 发展和结局会重新生成(LLM 的随机性会产生不同结果)

interrupt() 详解

基本语法

python
from langgraph.types import interrupt

def node_with_interrupt(state):
    # interrupt 的参数会作为 __interrupt__ 返回给调用者
    result = interrupt({
        "question": "请选择一个选项",
        "options": ["A", "B", "C"]
    })

    # result 是用户通过 Command(resume=value) 传入的值
    return {"user_choice": result}

完整流程

1. graph.invoke(inputs, config)

2. 执行到 interrupt()

3. 返回 {"__interrupt__": [...], ...other_state}

4. 调用者收集用户输入

5. graph.invoke(Command(resume=user_input), config)

6. interrupt() 返回 user_input,继续执行

常见用法

1. 简单确认

python
confirmed = interrupt({"message": "确认执行?"})
if confirmed:
    # 执行操作

2. 多选项

python
choice = interrupt({
    "question": "选择处理方式",
    "options": {"A": "重试", "B": "跳过", "C": "终止"}
})

3. 收集额外输入

python
extra_info = interrupt({
    "message": "请提供更多信息",
    "current_state": state
})
# extra_info 可以是任意类型的数据

Command 详解

基本用法

python
from langgraph.types import Command
from langgraph.constants import END

# 1. 恢复执行并传值
Command(resume=True)
Command(resume={"name": "Alice", "age": 25})

# 2. 指定下一个节点
Command(goto="next_node")
Command(goto=END)  # 结束执行

# 3. 更新状态
Command(update={"status": "approved"})

# 4. 组合使用
Command(goto="process", update={"approved": True})

在节点中使用

python
def decision_node(state) -> Command[Literal["approve", "reject", "__end__"]]:
    decision = interrupt({"question": "请选择"})

    if decision == "approve":
        return Command(goto="approve", update={"status": "approved"})
    elif decision == "reject":
        return Command(goto="reject", update={"status": "rejected"})
    else:
        return Command(goto=END)

类型提示Command[Literal["a", "b"]] 告诉 LangGraph 这个节点可能跳转到哪些节点,用于构建正确的图结构。


Checkpointer 选项

1. InMemorySaver(内存)

python
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()
  • 优点:简单快速
  • 缺点:程序重启后丢失
  • 适用:开发测试

2. SqliteSaver(SQLite)

python
from langgraph.checkpoint.sqlite import SqliteSaver

checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
  • 优点:持久化到文件
  • 缺点:单机使用
  • 适用:单用户应用

3. PostgresSaver(PostgreSQL)

python
from langgraph.checkpoint.postgres import PostgresSaver

checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost/db"
)
  • 优点:生产级、支持多用户
  • 缺点:需要数据库
  • 适用:生产环境

本章总结

Human-in-the-Loop (HITL)

概念作用
interrupt(value)暂停执行,返回 value 给调用者
Command(resume=x)恢复执行,x 作为 interrupt 返回值
Command(goto="node")指定下一步执行的节点
Command(update={...})更新状态

适用场景:人工审核、敏感操作确认、关键决策点

Time Travel

概念作用
get_state_history(config)获取所有历史 checkpoint
checkpoint_idcheckpoint 的唯一标识
invoke(None, config)从指定 checkpoint 继续执行

适用场景:调试回溯、A/B 测试、探索不同路径

关键依赖

两个特性都必须配置 Checkpointer

python
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

没有 Checkpointer:

  • interrupt() 无法暂停和恢复
  • 无法保存历史状态,Time Travel 不可用

思考题

  1. 如果一个 Graph 有多个 interrupt() 节点,执行顺序是怎样的?
  2. Time Travel 回溯后,原来的历史状态会被覆盖吗?
  3. 如何实现"审核失败后修改内容,重新提交审核"的流程?
  4. Checkpointer 会保存哪些数据?是否包含 LLM 的响应?

下一步

掌握了 HITL 和 Time Travel 后,你已经可以构建可靠、可控、可调试的 AI 应用了。建议:

  1. 尝试在自己的项目中加入人工审核节点
  2. 使用 Time Travel 调试复杂的多步骤流程
  3. 探索生产环境的 Checkpointer 配置(PostgreSQL)
  4. 结合 Stream 实现实时的审核界面

基于 MIT 许可证发布。内容版权归作者所有。