LangGraph Time Travel 详细解读
📚 概述
本文档详细解读 LangGraph 中的 Time Travel(时间旅行) 功能。这是一项强大的调试和状态管理能力,允许你查看、回放,甚至从图的历史状态中"分叉"出新的执行路径。通过 Time Travel,你可以像操作版本控制系统一样管理 Agent 的执行历史。
🎯 核心概念
什么是 Time Travel?
Time Travel 是 LangGraph 提供的一种高级状态管理功能,包含三个核心能力:
浏览历史(Browsing History)
- 查看图执行过程中的所有历史状态
- 访问每个 checkpoint(检查点)的完整数据
- 了解图在每个步骤的状态快照
回放(Replaying)
- 从历史中的某个 checkpoint 重新执行图
- 用于复现问题或验证修复
- 不改变历史,只是重新运行
分叉(Forking)
- 从历史状态创建新的执行分支
- 修改历史状态并继续执行
- 探索"如果当时输入不同会怎样"的场景
为什么需要 Time Travel?
在开发 AI Agent 时,我们经常遇到这些需求:
- 调试问题:Agent 在第 5 步出错了,想回到第 3 步重新执行
- 探索假设:如果用户输入的是 "5 * 3" 而不是 "2 * 3",结果会怎样?
- 恢复执行:Agent 意外中断,想从上次的状态继续
- 测试修改:修改了 Agent 逻辑,想用历史数据测试新版本
Human-in-the-Loop 的三大应用
Time Travel 是 Human-in-the-Loop 模式的重要组成部分:
应用 | 说明 | 使用场景 |
---|---|---|
Approval(审批) | 在关键节点暂停,等待人类批准 | 执行敏感操作前确认 |
Debugging(调试) | 回放历史状态,重现问题 | 排查 bug,验证修复 |
Editing(编辑) | 修改状态,改变执行路径 | 纠正错误,探索可能性 |
🎭 实战案例:数学计算 Agent
我们将构建一个简单的数学计算 Agent,演示 Time Travel 的完整功能:
Agent 功能:
- 接收用户的数学问题(如 "Multiply 2 and 3")
- 调用工具执行计算
- 返回结果
Time Travel 演示:
- 浏览 Agent 的执行历史
- 回放某个历史状态
- 分叉历史状态并修改输入
系统架构图
用户输入:"Multiply 2 and 3"
↓
[assistant] 解析意图 → 调用 multiply tool
↓
[tools] 执行计算 → 返回结果
↓
[assistant] 生成回复 → "The result is 6"
↓
(END)
每一步都会保存 checkpoint,可以随时回溯!
🔧 代码实现详解
1. 定义工具函数
def multiply(a: int, b: int) -> int:
"""Multiply a and b.
Args:
a: first int
b: second int
"""
return a * b
def add(a: int, b: int) -> int:
"""Adds a and b.
Args:
a: first int
b: second int
"""
return a + b
def divide(a: int, b: int) -> float:
"""Divide a by b.
Args:
a: first int
b: second int
"""
return a / b
tools = [add, multiply, divide]
说明:
- 定义三个基本数学运算工具
- 使用标准的 docstring 格式描述工具功能
- LLM 会根据 docstring 理解工具的用途
2. 绑定工具到 LLM
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
llm_with_tools = llm.bind_tools(tools)
LangChain 知识点:bind_tools
bind_tools()
将工具注册到 LLM,使其能够:
- 理解工具的功能
- 决定何时调用工具
- 生成工具调用的参数
3. 定义图结构
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState, StateGraph, START, END
from langgraph.prebuilt import tools_condition, ToolNode
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
# 系统提示词
sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")
# 定义 assistant 节点
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
# 创建图
builder = StateGraph(MessagesState)
# 添加节点
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
# 添加边
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
tools_condition, # 自动判断是否需要调用工具
)
builder.add_edge("tools", "assistant")
# 编译图(带 checkpointer)
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
关键概念解析:
MessagesState
这是 LangGraph 内置的状态类,专门用于管理对话消息:
class MessagesState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
# add_messages 是内置 reducer,自动合并消息
MemorySaver
这是 LangGraph 的 checkpointer(检查点管理器):
checkpointer=MemorySaver()
作用:
- 在每个节点执行后自动保存状态
- 存储在内存中(适合开发测试)
- 生产环境可以使用持久化的 checkpointer(如数据库)
重要: Time Travel 功能依赖 checkpointer!没有 checkpointer 就没有历史状态。
tools_condition
这是 LangGraph 预构建的条件函数:
tools_condition(state)
功能:
- 检查最后一条消息是否包含工具调用
- 如果是工具调用 → 路由到 "tools" 节点
- 如果不是 → 路由到 END
4. 执行图
# 输入
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}
# 线程配置(用于标识会话)
thread = {"configurable": {"thread_id": "1"}}
# 运行图
for event in graph.stream(initial_input, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
输出:
================================ Human Message =================================
Multiply 2 and 3
================================== Ai Message ==================================
Tool Calls:
multiply (call_ikJxMpb777bKMYgmM3d9mYjW)
Args:
a: 2
b: 3
================================= Tool Message =================================
Name: multiply
6
================================== Ai Message ==================================
The result of multiplying 2 and 3 is 6.
执行流程:
- 用户输入 "Multiply 2 and 3"
- Assistant 节点识别需要调用
multiply
工具 - Tools 节点执行
multiply(2, 3)
返回6
- Assistant 节点生成最终回复
Python 知识点:thread_id
thread = {"configurable": {"thread_id": "1"}}
thread_id
用于唯一标识一个会话- 同一个
thread_id
的所有执行共享历史状态 - 不同的
thread_id
拥有独立的状态和历史
🔍 浏览历史(Browsing History)
查看当前状态
graph.get_state({'configurable': {'thread_id': '1'}})
返回的 StateSnapshot:
StateSnapshot(
values={
'messages': [
HumanMessage(content='Multiply 2 and 3', id='4ee8c440-...'),
AIMessage(content='', tool_calls=[...], id='run-bc24d334-...'),
ToolMessage(content='6', name='multiply', id='1012611a-...'),
AIMessage(content='The result of multiplying 2 and 3 is 6.', id='run-b46f3fed-...')
]
},
next=(), # 下一个要执行的节点(空表示已完成)
config={
'configurable': {
'thread_id': '1',
'checkpoint_ns': '',
'checkpoint_id': '1ef6a440-ac9e-6024-8003-6fd8435c1d3b'
}
},
metadata={'source': 'loop', 'step': 3, ...},
created_at='2024-09-03T22:29:54.309727+00:00',
parent_config={...}, # 父 checkpoint
tasks=()
)
StateSnapshot 字段解析:
字段 | 说明 | 用途 |
---|---|---|
values | 当前状态的完整数据 | 查看状态内容 |
next | 下一个要执行的节点 | 判断执行进度 |
config | 配置信息(包含 checkpoint_id) | 用于回放或分叉 |
metadata | 元数据(步数、来源等) | 调试和追踪 |
created_at | 创建时间 | 时间线分析 |
parent_config | 父 checkpoint 的配置 | 追溯历史 |
tasks | 待执行的任务 | 查看中断状态 |
获取完整历史
all_states = [s for s in graph.get_state_history(thread)]
len(all_states) # 输出:5
历史结构:
all_states[0] # 当前状态(最新)
all_states[1] # 倒数第 2 个状态
all_states[2] # 倒数第 3 个状态
...
all_states[-1] # 初始状态(最早)
可视化历史:
Step 0 (all_states[-1]): 用户输入
↓
Step 1 (all_states[-2]): Assistant 生成工具调用
↓
Step 2 (all_states[-3]): Tools 执行计算
↓
Step 3 (all_states[-4]): Assistant 生成最终回复
↓
Step 4 (all_states[0]): 当前状态(完成)
查看特定历史状态
# 查看第一次接收用户输入后的状态
to_replay = all_states[-2]
# 查看状态内容
to_replay.values
# 输出:{'messages': [HumanMessage(content='Multiply 2 and 3', ...)]}
# 查看下一个要执行的节点
to_replay.next
# 输出:('assistant',)
# 查看 checkpoint ID
to_replay.config
# 输出:{'configurable': {'thread_id': '1', 'checkpoint_id': '1ef6a440-...'}}
🔄 回放(Replaying)
回放允许你从历史中的某个 checkpoint 重新执行图。
核心概念
回放 vs 继续执行:
操作 | 说明 | 状态 |
---|---|---|
继续执行 | 从当前状态继续未完成的任务 | 使用当前 checkpoint |
回放 | 从历史状态重新执行已完成的任务 | 使用历史 checkpoint |
回放示例
# 获取要回放的历史状态
to_replay = all_states[-2]
# 从该 checkpoint 开始重新执行
for event in graph.stream(None, to_replay.config, stream_mode="values"):
event['messages'][-1].pretty_print()
关键点:
- 第一个参数是
None
:不提供新输入,使用 checkpoint 中保存的状态 - 第二个参数是
to_replay.config
:包含checkpoint_id
,指定从哪个历史状态开始 - 图会自动识别这是一个已执行过的 checkpoint,进行回放
输出:
================================ Human Message =================================
Multiply 2 and 3
================================== Ai Message ==================================
Tool Calls:
multiply (call_SABfB57CnDkMu9HJeUE0mvJ9)
Args:
a: 2
b: 3
================================= Tool Message =================================
Name: multiply
6
================================== Ai Message ==================================
The result of multiplying 2 and 3 is 6.
注意: 由于 LLM 的随机性,回放的结果可能与原始执行略有不同(例如工具调用 ID 不同)。
🌿 分叉(Forking)
分叉是 Time Travel 最强大的功能:从历史状态创建新的执行分支。
核心概念
分叉 vs 回放:
操作 | 状态 | 历史 | 用途 |
---|---|---|---|
回放 | 不修改状态 | 不改变历史 | 重现执行过程 |
分叉 | 修改状态 | 创建新分支 | 探索替代路径 |
分叉原理
原始历史:
┌─ Step 0: 用户输入 "2 * 3"
├─ Step 1: Assistant 调用工具
├─ Step 2: Tools 返回 6
└─ Step 3: Assistant 回复 "结果是 6"
分叉操作(在 Step 0 修改输入为 "5 * 3"):
┌─ Step 0: 用户输入 "2 * 3" (原始)
│
└─ Step 0': 用户输入 "5 * 3" (分叉) ← 从这里开始新的执行
├─ Step 1': Assistant 调用工具
├─ Step 2': Tools 返回 15
└─ Step 3': Assistant 回复 "结果是 15"
分叉实现
步骤 1:选择要分叉的状态
to_fork = all_states[-2] # 选择用户输入后的状态
to_fork.values["messages"]
# 输出:[HumanMessage(content='Multiply 2 and 3', id='4ee8c440-...')]
步骤 2:修改状态
fork_config = graph.update_state(
to_fork.config, # 指定要修改的 checkpoint
{"messages": [HumanMessage(
content='Multiply 5 and 3', # 修改后的内容
id=to_fork.values["messages"][0].id # ⭐ 保持相同的 ID!
)]},
)
关键点:Message ID 的作用
id=to_fork.values["messages"][0].id
为什么要保持相同的 ID?
LangGraph 的 add_messages
reducer 有特殊逻辑:
- 如果 ID 相同:覆盖原有消息(更新)
- 如果 ID 不同:追加新消息(插入)
# 示例:
# 原始消息:[HumanMessage(content='Multiply 2 and 3', id='abc')]
# 覆盖(使用相同 ID):
update({"messages": [HumanMessage(content='Multiply 5 and 3', id='abc')]})
# 结果:[HumanMessage(content='Multiply 5 and 3', id='abc')]
# 追加(使用不同 ID):
update({"messages": [HumanMessage(content='Multiply 5 and 3', id='xyz')]})
# 结果:[
# HumanMessage(content='Multiply 2 and 3', id='abc'),
# HumanMessage(content='Multiply 5 and 3', id='xyz')
# ]
步骤 3:从分叉的状态继续执行
for event in graph.stream(None, fork_config, stream_mode="values"):
event['messages'][-1].pretty_print()
输出:
================================ Human Message =================================
Multiply 5 and 3
================================== Ai Message ==================================
Tool Calls:
multiply (call_KP2CVNMMUKMJAQuFmamHB21r)
Args:
a: 5
b: 3
================================= Tool Message =================================
Name: multiply
15
================================== Ai Message ==================================
The result of multiplying 5 and 3 is 15.
步骤 4:验证分叉结果
# 查看当前状态
graph.get_state({'configurable': {'thread_id': '1'}})
# 查看历史
all_states = [state for state in graph.get_state_history(thread)]
all_states[0].values["messages"]
# 输出:[HumanMessage(content='Multiply 5 and 3', ...), ...]
注意: 分叉会创建一个新的 checkpoint 分支,但不会删除原始历史。
🌐 使用 LangGraph API 进行 Time Travel
LangGraph 提供了远程 API,可以通过网络调用 Time Travel 功能。
启动 LangGraph 开发服务器
在项目目录下运行:
cd module-3/studio
langgraph dev
输出:
- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API Docs: http://127.0.0.1:2024/docs
连接到 API
from langgraph_sdk import get_client
client = get_client(url="http://127.0.0.1:2024")
通过 API 运行 Agent
from langchain_core.messages import HumanMessage
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}
# 创建线程
thread = await client.threads.create()
# 流式运行
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent", # 在 langgraph.json 中定义
input=initial_input,
stream_mode="updates",
):
if chunk.data:
assistant_node = chunk.data.get('assistant', {}).get('messages', [])
tool_node = chunk.data.get('tools', {}).get('messages', [])
if assistant_node:
print("--------------------Assistant Node--------------------")
print(assistant_node[-1])
elif tool_node:
print("--------------------Tools Node--------------------")
print(tool_node[-1])
stream_mode 说明:
模式 | 输出内容 | 用途 |
---|---|---|
"values" | 每个节点执行后的完整状态 | 查看状态变化 |
"updates" | 每个节点对状态的更新 | 查看增量变化 |
通过 API 回放
# 获取历史
states = await client.threads.get_history(thread['thread_id'])
to_replay = states[-2]
# 回放
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input=None, # 不提供新输入
stream_mode="values",
checkpoint_id=to_replay['checkpoint_id'] # 指定要回放的 checkpoint
):
print(f"Receiving new event of type: {chunk.event}...")
print(chunk.data)
通过 API 分叉
# 获取要分叉的状态
states = await client.threads.get_history(thread['thread_id'])
to_fork = states[-2]
# 准备新输入
forked_input = {
"messages": HumanMessage(
content="Multiply 3 and 3",
id=to_fork['values']['messages'][0]['id'] # 保持相同 ID
)
}
# 更新状态(创建分叉)
forked_config = await client.threads.update_state(
thread["thread_id"],
forked_input,
checkpoint_id=to_fork['checkpoint_id']
)
# 从分叉状态继续执行
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input=None,
stream_mode="updates",
checkpoint_id=forked_config['checkpoint_id']
):
# 处理输出
...
🎓 核心知识点总结
LangGraph 特有概念
1. Checkpointer(检查点管理器)
作用: 自动保存图的执行历史
from langgraph.checkpoint.memory import MemorySaver
graph = builder.compile(checkpointer=MemorySaver())
类型:
MemorySaver
:内存存储(开发测试)SqliteSaver
:SQLite 数据库(持久化)- 自定义 checkpointer(生产环境)
保存时机:
- 每个节点执行后自动保存
- 图完成后保存最终状态
- 中断时保存中断状态
2. StateSnapshot
状态快照,包含某个时刻的完整状态信息:
class StateSnapshot:
values: dict # 状态数据
next: tuple # 下一个节点
config: dict # 配置(包含 checkpoint_id)
metadata: dict # 元数据
created_at: datetime # 创建时间
parent_config: dict # 父 checkpoint
tasks: tuple # 待执行任务
3. Thread(线程)
用于标识和隔离不同的会话:
thread = {"configurable": {"thread_id": "user_123"}}
特性:
- 每个 thread 有独立的状态和历史
- 相同 thread_id 共享历史
- 可用于多用户、多会话场景
4. get_state_history()
获取完整历史的方法:
all_states = list(graph.get_state_history(thread))
# all_states[0] # 最新状态
# all_states[-1] # 最早状态
返回: 按时间倒序排列的 StateSnapshot 列表
5. update_state()
修改历史状态,创建分叉:
fork_config = graph.update_state(
checkpoint_config, # 要修改的 checkpoint
state_updates # 状态更新
)
返回: 新分叉的 checkpoint 配置
Python 特有知识点
1. MessagesState
LangGraph 内置的消息状态类:
from langgraph.graph import MessagesState
# 等价于:
class MessagesState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
add_messages reducer:
- 自动合并消息列表
- 相同 ID 的消息会被覆盖
- 不同 ID 的消息会被追加
2. Message ID 机制
所有 LangChain 消息都有 id
字段:
msg = HumanMessage(content="Hello", id="abc123")
用途:
- 唯一标识消息
- 用于消息去重
- 用于消息更新(分叉时覆盖消息)
自动生成: 如果不提供 ID,LangChain 会自动生成 UUID
3. 异步编程(async/await)
LangGraph API 使用异步接口:
# 异步函数定义
async def my_function():
result = await async_operation()
return result
# 在 Jupyter 中直接使用 await
thread = await client.threads.create()
# 在普通 Python 脚本中需要:
import asyncio
asyncio.run(my_function())
异步迭代:
async for chunk in client.runs.stream(...):
process(chunk)
💡 最佳实践
1. 何时使用 Time Travel?
✅ 适用场景:
- 调试复杂的 Agent 行为
- 复现用户报告的问题
- 探索不同输入的影响
- A/B 测试不同的 Agent 配置
- 恢复中断的执行
❌ 不适用场景:
- 生产环境的实时执行(性能开销)
- 简单的无状态操作
- 不需要历史追溯的场景
2. Checkpointer 选择
开发环境:
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver() # 简单快速,重启后丢失
测试环境:
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("checkpoints.db") # 持久化到文件
生产环境:
# 使用分布式数据库
# 例如:PostgreSQL、MongoDB 等
3. Thread ID 命名策略
按用户:
thread = {"configurable": {"thread_id": f"user_{user_id}"}}
按会话:
thread = {"configurable": {"thread_id": f"session_{session_id}"}}
按任务:
thread = {"configurable": {"thread_id": f"task_{task_id}_{timestamp}"}}
最佳实践:
- 使用有意义的 ID 便于调试
- 避免 ID 冲突
- 考虑添加时间戳或 UUID
4. 分叉时的注意事项
保持 Message ID:
# ✅ 正确:保持原始 ID 以覆盖消息
HumanMessage(content="New input", id=original_message.id)
# ❌ 错误:不提供 ID 会追加消息
HumanMessage(content="New input")
验证分叉结果:
# 分叉后检查状态
fork_state = graph.get_state(fork_config)
print(fork_state.values) # 确认状态已更新
清理旧分支:
- 分叉会创建新的历史分支
- 定期清理不需要的 checkpoint 以节省存储
5. 历史查询优化
限制历史数量:
# 只获取最近的 N 个状态
from itertools import islice
recent_states = list(islice(graph.get_state_history(thread), 10))
按条件筛选:
# 只获取特定节点的状态
states_at_assistant = [
s for s in graph.get_state_history(thread)
if 'assistant' in s.metadata.get('writes', {})
]
🚀 进阶技巧
1. 多分支探索
从同一个 checkpoint 创建多个分叉:
base_checkpoint = all_states[-2]
# 分叉 1:输入 "5 * 3"
fork1 = graph.update_state(
base_checkpoint.config,
{"messages": [HumanMessage(content="Multiply 5 and 3", id=msg_id)]}
)
# 分叉 2:输入 "10 * 3"
fork2 = graph.update_state(
base_checkpoint.config,
{"messages": [HumanMessage(content="Multiply 10 and 3", id=msg_id)]}
)
# 分别执行
for event in graph.stream(None, fork1):
# 处理分支 1
pass
for event in graph.stream(None, fork2):
# 处理分支 2
pass
2. 时间线可视化
def visualize_timeline(states):
for i, state in enumerate(reversed(states)):
step = len(states) - i - 1
timestamp = state.created_at.strftime("%H:%M:%S")
next_node = state.next[0] if state.next else "END"
print(f"Step {step} [{timestamp}] → {next_node}")
print(f" Messages: {len(state.values.get('messages', []))}")
print()
visualize_timeline(all_states)
3. 自动保存关键 Checkpoint
def save_important_checkpoints(thread):
checkpoints = []
for state in graph.get_state_history(thread):
# 保存所有用户输入的状态
if any(isinstance(m, HumanMessage) for m in state.values['messages']):
checkpoints.append({
'checkpoint_id': state.config['configurable']['checkpoint_id'],
'timestamp': state.created_at,
'messages': state.values['messages']
})
return checkpoints
4. 条件回放
根据条件决定是否回放:
def replay_if_error(state):
# 如果发现错误,回放到上一个稳定状态
if "error" in state.metadata.get('writes', {}):
history = list(graph.get_state_history(thread))
for prev_state in history:
if prev_state.next == ('assistant',): # 找到上一个 assistant 节点
print(f"Error detected, replaying from checkpoint {prev_state.config['configurable']['checkpoint_id']}")
for event in graph.stream(None, prev_state.config):
# 回放
pass
break
📊 Time Travel vs 其他调试方法
方法 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
Time Travel | 可回溯、可分叉、完整历史 | 需要 checkpointer、存储开销 | 复杂 Agent 调试 |
日志记录 | 简单、轻量 | 不可回放、无状态快照 | 简单调试、性能分析 |
Breakpoint | 实时暂停、可修改 | 需要提前设置、无历史回溯 | 开发阶段调试 |
单元测试 | 自动化、可重复 | 无法复现实际执行 | 功能验证 |
🎯 实际应用案例
案例 1:客服 Agent 错误修复
场景: 客服 Agent 在第 3 轮对话时给出了错误答案
解决方案:
- 使用
thread_id
获取该用户的历史 - 找到第 3 轮对话的 checkpoint
- 回放以重现问题
- 修改 Agent 逻辑
- 分叉重新执行验证修复
# 获取用户历史
thread = {"configurable": {"thread_id": f"user_{user_id}"}}
history = list(graph.get_state_history(thread))
# 找到第 3 轮对话
round_3 = [s for s in history if len(s.values['messages']) == 6][0] # 3 轮 = 6 条消息
# 回放重现问题
for event in graph.stream(None, round_3.config):
# 观察错误
pass
# 修复 Agent 后,分叉重新执行
fork = graph.update_state(round_3.config, {...})
for event in graph.stream(None, fork):
# 验证修复
pass
案例 2:A/B 测试不同提示词
场景: 测试两个不同的系统提示词哪个效果更好
base_state = history[-1] # 用户输入后的状态
# 测试提示词 A
fork_a = graph.update_state(
base_state.config,
{"system_prompt": "You are a helpful assistant..."}
)
result_a = list(graph.stream(None, fork_a))
# 测试提示词 B
fork_b = graph.update_state(
base_state.config,
{"system_prompt": "You are a concise assistant..."}
)
result_b = list(graph.stream(None, fork_b))
# 比较结果
compare_results(result_a, result_b)
案例 3:用户反馈修正
场景: 用户对 Agent 的回复不满意,想修改输入重新执行
# 用户最初的输入
original_input = "Multiply 2 and 3"
# Agent 给出了回复,用户不满意
# 用户修改输入为 "Multiply 2 and 3, then add 5"
# 获取原始输入的 checkpoint
history = list(graph.get_state_history(thread))
original_checkpoint = history[-2]
# 分叉并修改输入
fork = graph.update_state(
original_checkpoint.config,
{"messages": [HumanMessage(
content="Multiply 2 and 3, then add 5",
id=original_checkpoint.values['messages'][0].id
)]}
)
# 重新执行
for event in graph.stream(None, fork):
print(event)
🔍 常见问题
Q1: 没有 checkpointer 可以使用 Time Travel 吗?
不可以。 Time Travel 完全依赖 checkpointer 保存的历史状态。
# ❌ 没有 checkpointer - 无法使用 Time Travel
graph = builder.compile()
# ✅ 有 checkpointer - 可以使用 Time Travel
graph = builder.compile(checkpointer=MemorySaver())
Q2: 回放会产生副作用吗?
可能会。 如果节点中有副作用操作(如写入数据库、调用外部 API),回放时会再次执行这些操作。
解决方案:
- 在开发环境使用模拟数据
- 添加"回放模式"标志,跳过副作用操作
- 使用幂等操作
def my_node(state):
if state.get("replay_mode"):
return cached_result # 跳过实际操作
else:
return perform_real_operation() # 正常执行
Q3: 分叉会影响原始历史吗?
不会。 分叉创建一个新的 checkpoint 分支,原始历史保持不变。
原始历史:A → B → C → D
↓ (分叉)
A → B' → C' → D'
两条分支独立存在,互不影响
Q4: 如何清理历史 checkpoint?
目前 LangGraph 不提供自动清理功能,需要手动管理:
# 对于 SqliteSaver
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
# 手动删除数据库文件
import os
os.remove("checkpoints.db")
最佳实践:
- 为不同任务使用不同的 thread_id
- 定期清理旧的 thread
- 在测试时使用 MemorySaver(自动清理)
Q5: 可以跨图共享 checkpoint 吗?
不可以。 Checkpoint 与特定的图结构绑定。如果修改了图结构(如添加节点、修改边),旧的 checkpoint 可能无法正确回放。
建议:
- 使用版本化的图(graph_v1, graph_v2)
- 为每个版本使用独立的 checkpointer
- 迁移重要的历史数据时要谨慎
📖 扩展阅读
🎊 总结
Time Travel 是 LangGraph 提供的强大调试和状态管理工具。通过 checkpointer 自动保存的历史状态,我们可以:
- 浏览历史:查看图执行的每一个步骤
- 回放:重新执行历史状态,重现问题
- 分叉:修改历史状态,探索不同的执行路径
这三大功能让我们能够像使用版本控制系统一样管理 Agent 的执行历史,极大地提升了开发和调试效率。
核心要点:
- Time Travel 依赖 checkpointer
- Thread ID 用于标识和隔离会话
- Message ID 用于消息覆盖(分叉时必需)
- 分叉不影响原始历史
- 适合复杂 Agent 的调试和测试
掌握 Time Travel,你就掌握了 LangGraph 开发的"时光机"!