LangGraph 状态编辑与人工反馈详细解读
📚 概述
本文档详细解读 LangGraph 中的**状态编辑(State Editing)和人工反馈(Human Feedback)**机制。这是构建人机协作 AI 系统的核心技术,让我们能够在 Agent 执行过程中暂停、检查、修改状态,并引入人类智慧来指导 AI 决策。
🎯 核心概念
什么是状态编辑?
状态编辑(State Editing) 是 LangGraph 提供的一种能力,允许我们在图执行过程中:
- 暂停图的执行(使用断点)
- 检查当前状态
- 修改状态数据
- 继续执行
这是一种"时光机"能力,让我们可以干预 AI Agent 的执行流程。
人机协作的三种模式
在之前的学习中,我们了解了人机协作的三种主要模式:
审批模式(Approval)
- 在关键操作前暂停
- 展示 AI 计划执行的操作
- 等待人类批准或拒绝
- 示例:AI 计划删除数据前,先获得确认
调试模式(Debugging)
- 回溯图的执行历史
- 重现或避免错误
- 分析问题根源
- 示例:当 AI 出错时,回到之前的检查点重新执行
编辑模式(Editing) ⭐ 本课重点
- 直接修改图的状态
- 注入人类反馈或知识
- 纠正 AI 的理解偏差
- 示例:修正 AI 误解的用户意图
为什么需要状态编辑?
考虑以下场景:
场景 1:纠正 AI 理解
用户:"计算 2 × 3"
AI:准备执行...
[暂停]
人类:"不对,我想要 3 × 3"
[修改状态]
AI:执行修正后的计算 → 9
场景 2:注入专家知识
AI:分析数据,发现异常值...
[暂停]
专家:"这个异常值是预期的,是季节性因素"
[添加上下文到状态]
AI:基于专家反馈调整分析...
场景 3:引导探索方向
研究 Agent:规划调研 10 个主题...
[暂停]
研究员:"聚焦前 3 个即可,其他不重要"
[修改待处理列表]
Agent:执行精简后的计划...
🔧 实战案例 1:基础状态编辑
系统架构
我们构建一个数学计算 Agent,支持在执行前修改用户输入:
用户输入 "Multiply 2 and 3"
↓
[断点] interrupt_before=["assistant"]
↓
(检查状态)
↓
(修改状态) update_state()
↓
[assistant] 执行修正后的计算
↓
[tools] 调用工具
↓
[assistant] 返回结果
代码实现详解
1. 定义工具和模型
from langchain_openai import ChatOpenAI
def multiply(a: int, b: int) -> int:
"""Multiply a and b.
Args:
a: first int
b: second int
"""
return a * b
def add(a: int, b: int) -> int:
"""Adds a and b.
Args:
a: first int
b: second int
"""
return a + b
def divide(a: int, b: int) -> float:
"""Divide a by b.
Args:
a: first int
b: second int
"""
return a / b
tools = [add, multiply, divide]
llm = ChatOpenAI(model="gpt-4o")
llm_with_tools = llm.bind_tools(tools)
关键点:
- 定义了三个数学工具:加法、乘法、除法
- 每个工具都有清晰的文档字符串(docstring),LLM 会据此理解工具用途
bind_tools()
将工具绑定到 LLM,使其能够调用这些工具
Python 知识点:函数文档字符串(Docstring)
def multiply(a: int, b: int) -> int:
"""Multiply a and b. # 简短描述
Args: # 参数说明
a: first int
b: second int
"""
return a * b
这种格式遵循 Google Style 文档规范,LangChain 会自动解析并转换为工具描述,供 LLM 理解。
2. 构建图
from IPython.display import Image, display
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import tools_condition, ToolNode
from langchain_core.messages import HumanMessage, SystemMessage
# 系统消息
sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")
# 助手节点
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
# 构建图
builder = StateGraph(MessagesState)
# 定义节点
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
# 定义边
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
tools_condition, # 自动判断:工具调用 → tools,否则 → END
)
builder.add_edge("tools", "assistant")
# 编译图(关键:设置断点)
memory = MemorySaver()
graph = builder.compile(
interrupt_before=["assistant"], # ⭐ 在 assistant 前暂停
checkpointer=memory
)
LangGraph 知识点:interrupt_before
graph = builder.compile(interrupt_before=["assistant"], checkpointer=memory)
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^
# 在指定节点前设置断点
interrupt_before=["assistant"]
:在执行assistant
节点前暂停checkpointer=memory
:保存执行状态,支持恢复和修改- 断点会让图停在指定节点之前,等待人工干预
为什么需要 checkpointer?
Checkpointer 是状态管理的核心:
MemorySaver() # 内存存储(开发/测试)
# 生产环境可用:
# PostgresSaver() # 数据库存储
# RedisSaver() # Redis 存储
没有 checkpointer,图无法暂停和恢复!
3. 执行并触发断点
# 输入
initial_input = {"messages": "Multiply 2 and 3"}
# 线程配置(用于追踪会话)
thread = {"configurable": {"thread_id": "1"}}
# 运行图直到第一个断点
for event in graph.stream(initial_input, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
输出:
================================ Human Message =================================
Multiply 2 and 3
观察:
- 图接收到用户消息
- 在
assistant
节点前暂停(因为interrupt_before=["assistant"]
) - AI 还未执行,我们可以干预
4. 检查状态
state = graph.get_state(thread)
state
输出:
StateSnapshot(
values={'messages': [HumanMessage(content='Multiply 2 and 3', id='e7edcaba-...')]},
next=('assistant',), # ⭐ 下一个要执行的节点
config={'configurable': {'thread_id': '1', ...}},
...
)
LangGraph 知识点:StateSnapshot
StateSnapshot
是图在某个时间点的完整快照:
字段 | 说明 | 示例 |
---|---|---|
values | 当前状态数据 | {'messages': [...]} |
next | 待执行的节点 | ('assistant',) |
config | 会话配置 | {'thread_id': '1'} |
tasks | 待执行任务 | (PregelTask(...),) |
通过 next
,我们知道暂停在哪个节点前。
5. 修改状态 ⭐
graph.update_state(
thread,
{"messages": [HumanMessage(content="No, actually multiply 3 and 3!")]},
)
关键概念:Reducer 机制
MessagesState
的 messages
字段使用 add_messages
reducer:
class MessagesState(TypedDict):
messages: Annotated[list, add_messages] # 使用 add_messages reducer
Reducer 的行为:
如果提供消息 ID:覆盖已有消息
python# 假设原消息 id='abc' update_state({"messages": [HumanMessage(content="新内容", id='abc')]}) # 结果:原消息被替换
如果不提供消息 ID:追加新消息
pythonupdate_state({"messages": [HumanMessage(content="新消息")]}) # 结果:新消息被追加到列表末尾
在我们的例子中,没有提供 ID,所以是追加新消息。
6. 验证状态修改
new_state = graph.get_state(thread).values
for m in new_state['messages']:
m.pretty_print()
输出:
================================ Human Message =================================
Multiply 2 and 3
================================ Human Message =================================
No, actually multiply 3 and 3!
观察:
- 原始消息保留
- 新消息被追加
- AI 将看到两条消息的上下文
7. 继续执行
for event in graph.stream(None, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
重要:传递 None
表示从当前状态继续
输出:
================================ Human Message =================================
No, actually multiply 3 and 3!
================================== Ai Message ==================================
Tool Calls:
multiply (call_Mbu8...)
Args:
a: 3
b: 3
================================= Tool Message =================================
Name: multiply
9
流程分析:
- 从断点恢复,AI 看到修改后的消息
- AI 理解为计算 3 × 3
- 调用
multiply
工具 - 得到结果 9
8. 再次继续(处理工具返回)
for event in graph.stream(None, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
输出:
================================= Tool Message =================================
Name: multiply
9
================================== Ai Message ==================================
3 multiplied by 3 equals 9.
注意: 因为设置了 interrupt_before=["assistant"]
,所以在 assistant 节点前又暂停了一次。需要再调用 stream(None, ...)
才能得到最终回复。
🎭 实战案例 2:使用 LangGraph Studio
Studio 简介
LangGraph Studio 是可视化开发工具,可以:
- 可视化图结构
- 交互式调试
- 实时修改状态
- 查看执行历史
本地运行 Studio
cd /path/to/module-3/studio
langgraph dev
输出:
🚀 API: http://127.0.0.1:2024
🎨 Studio UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
📚 API Docs: http://127.0.0.1:2024/docs
打开浏览器访问 Studio UI 即可使用可视化界面。
使用 LangGraph SDK
Studio 提供了 HTTP API,我们可以通过 SDK 编程调用:
1. 连接到 Studio
from langgraph_sdk import get_client
client = get_client(url="http://127.0.0.1:2024")
2. 创建线程并运行(带断点)
initial_input = {"messages": "Multiply 2 and 3"}
thread = await client.threads.create()
async for chunk in client.runs.stream(
thread["thread_id"],
"agent", # agent 名称(定义在 studio/agent.py)
input=initial_input,
stream_mode="values",
interrupt_before=["assistant"], # ⭐ SDK 支持动态设置断点
):
print(f"Receiving new event of type: {chunk.event}...")
messages = chunk.data.get('messages', [])
if messages:
print(messages[-1])
重要发现: 即使 studio/agent.py
中没有定义断点,我们也可以通过 API 动态注入!
输出:
Receiving new event of type: metadata...
--------------------------------------------------
Receiving new event of type: values...
{'content': 'Multiply 2 and 3', 'type': 'human', 'id': '...'}
--------------------------------------------------
3. 获取当前状态
current_state = await client.threads.get_state(thread['thread_id'])
current_state
输出:
{
'values': {
'messages': [{'content': 'Multiply 2 and 3', 'type': 'human', ...}]
},
'next': ['assistant'], # 下一个节点
'tasks': [...],
'metadata': {...},
...
}
4. 提取并编辑消息
# 获取最后一条消息
last_message = current_state['values']['messages'][-1]
last_message
输出:
{
'content': 'Multiply 2 and 3',
'type': 'human',
'id': '882dabe4-...',
...
}
修改消息内容:
last_message['content'] = "No, actually multiply 3 and 3!"
Python 知识点:字典修改
# 字典是可变对象,直接修改会影响原数据
msg = {'content': 'old'}
msg['content'] = 'new'
print(msg) # {'content': 'new'}
5. 更新状态到服务器
await client.threads.update_state(
thread['thread_id'],
{"messages": last_message}
)
关键: 因为我们保留了消息的 id
,所以 add_messages
reducer 会覆盖原消息,而非追加。
6. 恢复执行
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input=None, # ⭐ 传递 None 表示从当前状态继续
stream_mode="values",
interrupt_before=["assistant"],
):
print(f"Receiving new event of type: {chunk.event}...")
messages = chunk.data.get('messages', [])
if messages:
print(messages[-1])
输出:
Receiving new event of type: values...
{'content': 'No, actually multiply 3 and 3!', 'type': 'human', ...}
--------------------------------------------------
Receiving new event of type: values...
{'content': '', 'tool_calls': [{'name': 'multiply', 'args': {'a': 3, 'b': 3}, ...}], ...}
--------------------------------------------------
Receiving new event of type: values...
{'content': '9', 'type': 'tool', 'name': 'multiply', ...}
--------------------------------------------------
7. 最终结果
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input=None,
stream_mode="values",
interrupt_before=["assistant"],
):
...
输出:
{'content': 'The result of multiplying 3 by 3 is 9.', 'type': 'ai', ...}
🚀 实战案例 3:等待用户输入节点
设计模式:Human Feedback Node
前面我们是在断点后手动调用 update_state
,现在我们设计一个专门的节点来接收人类反馈。
系统架构
START
↓
[human_feedback] <─┐ (断点:等待人工输入)
↓ │
[assistant] │
↓ │
[tools] ──────────┘
核心思路:
human_feedback
是一个"空节点"(no-op),仅作为断点- 在此节点前暂停,等待用户输入
- 使用
update_state(..., as_node="human_feedback")
注入反馈 - 继续执行
代码实现
1. 定义节点
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import MessagesState, START, StateGraph
from langgraph.prebuilt import tools_condition, ToolNode
from langgraph.checkpoint.memory import MemorySaver
sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")
# ⭐ 人工反馈节点(空操作)
def human_feedback(state: MessagesState):
pass
# 助手节点
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
重要: human_feedback
节点什么都不做,仅作为暂停点。
2. 构建图
builder = StateGraph(MessagesState)
# 添加节点
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_node("human_feedback", human_feedback)
# 定义边
builder.add_edge(START, "human_feedback")
builder.add_edge("human_feedback", "assistant")
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", "human_feedback") # ⭐ 工具执行后回到反馈节点
# 编译
memory = MemorySaver()
graph = builder.compile(
interrupt_before=["human_feedback"], # 在 human_feedback 前暂停
checkpointer=memory
)
图的执行流程:
1. START → human_feedback [暂停]
2. (人工输入) → assistant
3. assistant → tools (如果需要调用工具)
4. tools → human_feedback [暂停]
5. (人工确认) → assistant
6. assistant → END
3. 交互式执行
initial_input = {"messages": "Multiply 2 and 3"}
thread = {"configurable": {"thread_id": "5"}}
# 运行到第一个断点
for event in graph.stream(initial_input, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
# 获取用户输入
user_input = input("Tell me how you want to update the state: ")
# ⭐ 作为 human_feedback 节点更新状态
graph.update_state(
thread,
{"messages": user_input},
as_node="human_feedback" # 关键参数
)
# 继续执行
for event in graph.stream(None, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
LangGraph 知识点:as_node 参数
graph.update_state(
thread,
{"messages": user_input},
as_node="human_feedback" # ⭐ 模拟该节点的输出
)
作用:
- 将状态更新视为来自
human_feedback
节点的输出 - 图会认为
human_feedback
节点已执行完毕 - 自动流向下一个节点(
assistant
)
如果不指定 as_node
:
- 状态会被更新,但图不知道从哪个节点继续
- 需要手动管理执行流程
4. 执行结果
用户输入: no, multiply 3 and 3
输出:
================================ Human Message =================================
Multiply 2 and 3
[暂停,等待输入]
================================ Human Message =================================
no, multiply 3 and 3
================================== Ai Message ==================================
Tool Calls:
multiply (call_...)
Args:
a: 3
b: 3
================================= Tool Message =================================
Name: multiply
9
再次继续:
for event in graph.stream(None, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
输出:
================================= Tool Message =================================
Name: multiply
9
================================== Ai Message ==================================
The result of multiplying 3 and 3 is 9.
🎓 核心知识点总结
LangGraph 特有概念
1. 断点机制(Interrupt)
参数 | 说明 | 用途 |
---|---|---|
interrupt_before=["node"] | 在节点执行前暂停 | 人工审批、状态检查 |
interrupt_after=["node"] | 在节点执行后暂停 | 验证结果、调试 |
graph = builder.compile(
interrupt_before=["assistant"], # 执行前暂停
checkpointer=memory
)
2. 状态管理 API
方法 | 说明 | 示例 |
---|---|---|
get_state(config) | 获取当前状态 | state = graph.get_state(thread) |
update_state(config, values) | 更新状态 | graph.update_state(thread, {"messages": msg}) |
update_state(..., as_node="node") | 作为指定节点更新 | graph.update_state(thread, data, as_node="human") |
3. Reducer 机制
add_messages
reducer 的行为:
from langgraph.graph import MessagesState
# MessagesState 内部定义:
class MessagesState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
行为规则:
场景 | 示例 | 结果 |
---|---|---|
新消息(无 ID) | update({"messages": [HumanMessage("Hi")]}) | 追加到列表末尾 |
覆盖消息(有 ID) | update({"messages": [HumanMessage("Hi", id="123")]}) | 替换 ID 为 "123" 的消息 |
删除消息 | update({"messages": [RemoveMessage(id="123")]}) | 删除指定消息 |
4. 会话管理(Thread)
thread = {"configurable": {"thread_id": "1"}}
Thread 的作用:
- 隔离不同用户/会话的状态
- 支持多轮对话
- 追踪执行历史
类比理解:
# 类似于数据库的会话 ID
user_1 = {"configurable": {"thread_id": "user_1"}}
user_2 = {"configurable": {"thread_id": "user_2"}}
# 两个用户的状态完全隔离
graph.stream(input1, user_1) # 用户 1 的会话
graph.stream(input2, user_2) # 用户 2 的会话
5. Checkpointer 持久化
类型 | 适用场景 | 持久化 |
---|---|---|
MemorySaver() | 开发、测试 | 仅内存,重启丢失 |
SqliteSaver() | 本地应用 | 文件存储 |
PostgresSaver() | 生产环境 | 数据库 |
RedisSaver() | 高性能场景 | Redis |
from langgraph.checkpoint.postgres import PostgresSaver
# 生产环境示例
checkpointer = PostgresSaver(connection_string="postgresql://...")
graph = builder.compile(checkpointer=checkpointer)
Python 特有知识点
1. TypedDict vs Pydantic BaseModel
特性 | TypedDict | BaseModel |
---|---|---|
类型检查 | 静态(IDE) | 运行时 |
数据验证 | 无 | 有 |
性能 | 快 | 稍慢 |
用途 | 状态定义 | 数据模型 |
from typing_extensions import TypedDict
from pydantic import BaseModel
# TypedDict - 用于图状态
class MyState(TypedDict):
messages: list
count: int
# BaseModel - 用于结构化输出
class Response(BaseModel):
answer: str
confidence: float
2. 函数文档字符串(Docstring)
def multiply(a: int, b: int) -> int:
"""Multiply a and b.
Args:
a: first integer
b: second integer
Returns:
The product of a and b
Examples:
>>> multiply(2, 3)
6
"""
return a * b
最佳实践:
- 第一行:简短功能描述
Args
:参数说明Returns
:返回值说明Examples
:使用示例(可选)
3. 类型注解(Type Hints)
from typing import Annotated
from langgraph.graph import MessagesState
def assistant(state: MessagesState) -> dict:
# ^^^^^^^^^^^^^^ ^^^^
# 输入类型 输出类型
return {"messages": [...]}
为什么重要?
- IDE 自动补全
- 静态类型检查(mypy)
- 代码可读性
4. async/await 异步编程
LangGraph SDK 使用异步 API:
# 异步函数定义
async def main():
# 等待异步操作
state = await client.threads.get_state(thread_id)
# 异步迭代
async for chunk in client.runs.stream(...):
print(chunk)
# 运行异步函数
import asyncio
asyncio.run(main())
在 Jupyter 中:
# 直接使用 await(Jupyter 自动处理)
state = await client.threads.get_state(thread_id)
💡 最佳实践
1. 何时使用状态编辑?
✅ 适用场景:
- 纠正 AI 误解:用户表达不清,需要人工澄清
- 注入领域知识:AI 缺乏专业知识,专家提供指导
- 动态调整策略:根据中间结果调整执行计划
- 错误恢复:AI 出错,人工修正状态后重试
❌ 不适用场景:
- 简单的参数验证(应在输入前处理)
- 频繁的用户交互(考虑重新设计流程)
- 自动化流程(破坏自动化特性)
2. 断点设置策略
策略 1:关键决策点
# 在 AI 做重要决策前暂停
interrupt_before=["delete_data", "send_email", "make_payment"]
策略 2:分阶段审批
# 多个检查点
interrupt_before=["plan", "execute", "finalize"]
策略 3:动态断点(Studio API)
# 根据运行时条件设置断点
if is_production:
interrupt_before = ["risky_operation"]
else:
interrupt_before = []
client.runs.stream(..., interrupt_before=interrupt_before)
3. 状态更新模式
模式 1:追加反馈
# 不提供 ID,追加新消息
graph.update_state(
thread,
{"messages": [HumanMessage("Additional info: ...")]},
as_node="human_feedback"
)
模式 2:修正消息
# 提供 ID,覆盖原消息
state = graph.get_state(thread)
last_msg = state.values['messages'][-1]
last_msg['content'] = "Corrected message"
graph.update_state(
thread,
{"messages": [last_msg]}, # 保留 ID
as_node="human_feedback"
)
模式 3:部分状态更新
# 只更新特定字段
graph.update_state(
thread,
{"retry_count": 0, "error": None}, # 不影响 messages
as_node="error_handler"
)
4. 用户体验设计
原则 1:清晰的暂停提示
print("⏸ AI is paused. Current plan:")
print(f" - {state.values['planned_action']}")
user_input = input("Approve? (yes/no/edit): ")
原则 2:提供上下文
# 展示完整上下文,帮助用户决策
for msg in state.values['messages']:
print(f"{msg.type}: {msg.content}")
原则 3:支持多种操作
choice = input("Choose: (1) Approve (2) Edit (3) Cancel (4) Restart: ")
if choice == "1":
graph.stream(None, thread) # 继续
elif choice == "2":
new_input = input("Enter correction: ")
graph.update_state(thread, {"messages": new_input})
graph.stream(None, thread)
elif choice == "3":
# 取消执行
pass
elif choice == "4":
# 重新开始(新线程)
thread = {"configurable": {"thread_id": str(uuid.uuid4())}}
5. 错误处理
技巧 1:验证状态更新
try:
graph.update_state(thread, {"messages": user_input})
print("✅ State updated successfully")
except Exception as e:
print(f"❌ Failed to update state: {e}")
技巧 2:状态回滚
# 保存原始状态
original_state = graph.get_state(thread)
try:
graph.update_state(thread, new_data)
result = graph.stream(None, thread)
except Exception as e:
# 恢复原状态
graph.update_state(thread, original_state.values)
print(f"Rolled back due to error: {e}")
技巧 3:超时处理
import asyncio
async def run_with_timeout():
try:
# 等待用户输入,最多 60 秒
user_input = await asyncio.wait_for(
get_user_input_async(),
timeout=60.0
)
graph.update_state(thread, {"messages": user_input})
except asyncio.TimeoutError:
# 超时,使用默认值
graph.update_state(thread, {"messages": "timeout - using default"})
🚀 进阶技巧
1. 多轮反馈循环
# 允许用户多次修正
max_iterations = 3
for i in range(max_iterations):
# 执行到断点
for event in graph.stream(None, thread, stream_mode="values"):
print(event)
# 用户确认
satisfied = input("Satisfied? (yes/no): ")
if satisfied == "yes":
break
# 修正
correction = input("Enter correction: ")
graph.update_state(thread, {"messages": correction}, as_node="human_feedback")
2. 条件断点
def conditional_interrupt(state: MessagesState):
# 只在特定条件下暂停
if state.get("confidence", 1.0) < 0.7:
# 低置信度,需要人工确认
return "human_feedback"
else:
# 高置信度,直接执行
return "execute"
builder.add_conditional_edges("assistant", conditional_interrupt)
3. 协作式状态编辑
# 多个专家同时提供反馈
state = graph.get_state(thread)
expert_1_feedback = "Consider approach A because..."
expert_2_feedback = "I suggest approach B because..."
# 合并反馈
combined_feedback = f"""
Expert 1: {expert_1_feedback}
Expert 2: {expert_2_feedback}
"""
graph.update_state(
thread,
{"messages": [HumanMessage(combined_feedback)]},
as_node="expert_review"
)
4. 版本控制和审计
import json
from datetime import datetime
# 记录每次状态修改
def update_with_audit(thread, new_state, as_node=None):
# 获取修改前状态
before = graph.get_state(thread)
# 执行更新
result = graph.update_state(thread, new_state, as_node=as_node)
# 获取修改后状态
after = graph.get_state(thread)
# 记录审计日志
audit_log = {
"timestamp": datetime.now().isoformat(),
"before": before.values,
"after": after.values,
"changes": new_state,
"node": as_node
}
with open("audit.jsonl", "a") as f:
f.write(json.dumps(audit_log) + "\n")
return result
📊 状态编辑 vs 其他模式
对比:三种人机协作模式
特性 | 审批模式 | 调试模式 | 编辑模式 |
---|---|---|---|
主要用途 | 确认 AI 操作 | 重现/避免错误 | 修正/引导 AI |
状态修改 | 无 | 回滚到历史状态 | 直接修改当前状态 |
交互时机 | 操作前 | 错误后 | 任意时刻 |
技术实现 | 断点 | 历史回溯 | update_state |
适用场景 | 高风险操作 | 故障排查 | 动态调整 |
示例对比
审批模式
# 仅确认,不修改
for event in graph.stream(input, thread):
print(event)
confirm = input("Approve? (yes/no): ")
if confirm == "yes":
graph.stream(None, thread) # 继续执行
else:
# 取消
pass
调试模式
# 回到之前的检查点
history = graph.get_state_history(thread)
for state in history:
print(state.config['checkpoint_id'])
# 回滚到特定检查点
graph.update_state(history[2].config, history[2].values)
编辑模式(本课重点)
# 直接修改状态
state = graph.get_state(thread)
state.values['messages'][-1].content = "Modified"
graph.update_state(thread, state.values)
🎯 实际应用案例
案例 1:客服 Agent 误解修正
# 用户:"我要退货"
# AI 理解:"用户要查询退货政策"
# 暂停在 assistant 前
state = graph.get_state(thread)
print(f"AI 理解:{state.values['intent']}")
# 客服修正
graph.update_state(
thread,
{"intent": "process_return", "clarification": "用户要办理退货,非查询"},
as_node="human_supervisor"
)
# 继续处理
graph.stream(None, thread)
案例 2:研究 Agent 方向调整
# AI 规划:调研 10 个技术方向
# 专家:"只需要关注 LLM 和 RAG"
state = graph.get_state(thread)
original_topics = state.values['research_topics']
print(f"原计划:{original_topics}")
# 精简计划
graph.update_state(
thread,
{"research_topics": ["LLM", "RAG"]},
as_node="expert_review"
)
# 执行精简后的计划
graph.stream(None, thread)
案例 3:代码生成 Agent 需求澄清
# 用户:"写一个排序函数"
# AI:"准备写快速排序..."
# 暂停,让用户明确需求
state = graph.get_state(thread)
clarification = input("需要哪种排序算法?(bubble/quick/merge): ")
performance = input("数据规模?(small/large): ")
# 注入澄清信息
graph.update_state(
thread,
{
"messages": [HumanMessage(f"Use {clarification} sort for {performance} dataset")],
"algorithm": clarification,
"scale": performance
},
as_node="requirement_clarification"
)
graph.stream(None, thread)
🔍 常见问题
Q1: update_state 后需要调用 stream(None) 吗?
需要! update_state
只修改状态,不推进执行。必须调用 stream(None, thread)
才能继续。
# ❌ 错误:状态更新了,但图没继续执行
graph.update_state(thread, {"messages": "new message"})
# ✅ 正确:更新后继续执行
graph.update_state(thread, {"messages": "new message"})
graph.stream(None, thread) # 从当前状态继续
Q2: as_node 参数有什么用?
作用: 指定状态更新来自哪个节点,影响后续执行流。
# 不指定 as_node:状态更新,但图不知道该执行哪个节点
graph.update_state(thread, data)
# 指定 as_node:模拟该节点已执行,自动流向下一个节点
graph.update_state(thread, data, as_node="human_feedback")
# → 图认为 human_feedback 已完成,执行其后续节点
Q3: 断点和 as_node 如何配合?
典型模式:
# 1. 设置断点
graph = builder.compile(interrupt_before=["human_feedback"])
# 2. 执行到断点
graph.stream(input, thread)
# 3. 作为 human_feedback 节点更新状态
graph.update_state(thread, data, as_node="human_feedback")
# 4. 继续执行(跳过 human_feedback,执行下一个节点)
graph.stream(None, thread)
Q4: 如何覆盖消息而非追加?
方法: 保留消息的 id
。
# 追加(无 ID)
graph.update_state(thread, {"messages": [HumanMessage("new")]})
# 覆盖(有 ID)
state = graph.get_state(thread)
msg = state.values['messages'][-1]
msg.content = "modified"
graph.update_state(thread, {"messages": [msg]}) # msg 保留了 ID
Q5: MemorySaver 重启后数据会丢失吗?
会! MemorySaver
只存在内存中,进程重启即丢失。
生产环境方案:
from langgraph.checkpoint.sqlite import SqliteSaver
# 持久化到文件
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
graph = builder.compile(checkpointer=checkpointer)
📖 扩展阅读
🎓 本章总结
核心收获
- 断点机制:通过
interrupt_before/after
在关键节点暂停执行 - 状态管理:使用
get_state
检查、update_state
修改状态 - Reducer 机制:
add_messages
根据消息 ID 决定追加或覆盖 - as_node 参数:模拟节点输出,控制执行流程
- 人工反馈节点:设计专门的"空节点"作为人机交互点
- LangGraph Studio:可视化工具,支持动态断点和状态编辑
设计模式
模式 | 说明 | 适用场景 |
---|---|---|
检查点审批 | 在节点前暂停,等待确认 | 高风险操作 |
反馈注入 | 专门节点接收人类输入 | 多轮协作 |
状态修正 | 直接修改错误状态 | 纠正 AI 误解 |
动态断点 | 根据条件决定是否暂停 | 自适应审批 |
下一步
掌握了状态编辑和人工反馈,你已经具备构建高度可控的 AI Agent 的能力。接下来,可以学习:
- 并行处理:多个 Agent 协同工作
- 子图(Subgraph):模块化复杂流程
- 持久化策略:生产级状态管理
- 高级路由:动态决策和条件执行
状态编辑是人机协作的基石,掌握它将让你的 AI 系统更加智能、可靠、可控!