Module-4 小结和复习:Human-in-the-Loop 精通指南
来自图灵奖获得者的总结寄语
"当你完成本章学习,你已经掌握了构建可控 AI 系统的核心技术。记住:最强大的 AI 不是那些能独立完成所有任务的系统,而是那些知道何时需要人类介入、如何优雅地等待、并能从人类反馈中学习的系统。你现在拥有的不仅是技术工具,更是一套完整的'可信 AI'设计哲学。在未来的 AI 应用中,Human-in-the-Loop 将成为区分优秀系统和卓越系统的关键标志。"
— 启发自 Alan Turing 对人机协作的前瞻性思考
📚 核心术语表(Module-4 全模块)
| 术语名称 | LangGraph 定义和解读 | Python 定义和说明 | 重要程度 |
|---|---|---|---|
| Human-in-the-Loop | 人机协同设计模式,在 AI 执行关键操作前引入人类判断和反馈 | N/A (架构设计模式) | ⭐⭐⭐⭐⭐ |
| Breakpoint | 图执行过程中的主动暂停点,支持在节点前/后中断等待人工干预 | interrupt_before/after 参数实现 | ⭐⭐⭐⭐⭐ |
| interrupt_before | 编译时设置的静态断点,在指定节点执行前暂停 | graph.compile(interrupt_before=["node"]) | ⭐⭐⭐⭐⭐ |
| NodeInterrupt | 动态断点核心类,在节点内抛出此异常触发条件性中断 | raise NodeInterrupt("原因") | ⭐⭐⭐⭐⭐ |
| Checkpointer | 状态持久化管理器,自动保存执行历史,支持断点恢复和时间旅行 | MemorySaver() 或 PostgresSaver() 等 | ⭐⭐⭐⭐⭐ |
| get_state() | 获取图当前状态的方法,返回 StateSnapshot 对象 | state = graph.get_state(config) | ⭐⭐⭐⭐⭐ |
| stream(None) | 从断点恢复执行的关键用法,None 表示使用当前状态继续 | graph.stream(None, config) | ⭐⭐⭐⭐⭐ |
| update_state() | 修改图状态的方法,支持指定更新来源节点(as_node参数) | graph.update_state(config, values, as_node=...) | ⭐⭐⭐⭐⭐ |
| as_node 参数 | 指定状态更新来自哪个节点,决定后续执行流程 | update_state 的关键字参数 | ⭐⭐⭐⭐⭐ |
| Message ID | 消息唯一标识符,用于消息去重和覆盖,状态编辑核心机制 | BaseMessage 的 id 属性 | ⭐⭐⭐⭐⭐ |
| add_messages | MessagesState 的 reducer,根据 ID 决定追加或覆盖消息 | Annotated[list, add_messages] | ⭐⭐⭐⭐⭐ |
| Streaming | 图执行过程中实时输出结果的技术,支持多种粒度的流式输出 | 生成器或异步生成器实现 | ⭐⭐⭐⭐⭐ |
| stream_mode | 控制流式输出粒度的参数(updates/values/messages) | graph.stream(..., stream_mode="values") | ⭐⭐⭐⭐⭐ |
| astream_events | 异步流式事件 API,支持 Token 级流式输出 | async for event in graph.astream_events(..., version="v2") | ⭐⭐⭐⭐⭐ |
| Time Travel | 浏览、回放和分叉历史状态的能力,用于调试和测试 | 基于 checkpointer 的历史管理 | ⭐⭐⭐⭐⭐ |
| get_state_history() | 获取完整执行历史的方法,返回所有 StateSnapshot | list(graph.get_state_history(config)) | ⭐⭐⭐⭐⭐ |
| 回放(Replaying) | 从历史 checkpoint 重新执行图,不修改状态 | graph.stream(None, checkpoint_config) | ⭐⭐⭐⭐ |
| 分叉(Forking) | 从历史状态创建新执行分支,修改状态后探索替代路径 | update_state() + stream(None) 组合 | ⭐⭐⭐⭐⭐ |
| Thread | 线程配置,用于标识和隔离不同会话,通过 thread_id 区分 | {"configurable": {"thread_id": "1"}} | ⭐⭐⭐⭐⭐ |
| StateSnapshot | 状态快照对象,包含 values、next、config、metadata 等属性 | LangGraph 返回的数据结构 | ⭐⭐⭐⭐ |
📋 本章核心知识回顾
学习地图
graph TB
A[Human-in-the-Loop 体系] --> B[Breakpoints]
A --> C[Dynamic Breakpoints]
A --> D[Edit State]
A --> E[Streaming]
A --> F[Time Travel]
B --> B1[interrupt_before/after]
B --> B2[Checkpointer]
B --> B3[stream继续执行]
C --> C1[NodeInterrupt]
C --> C2[条件性中断]
C --> C3[信息传递]
D --> D1[update_state]
D --> D2[as_node参数]
D --> D3[Message ID覆盖]
E --> E1[stream_mode选择]
E --> E2[Token Streaming]
E --> E3[async/await]
F --> F1[get_state_history]
F --> F2[回放Replaying]
F --> F3[分叉Forking]
style A fill:#e1f5ff
style B fill:#fff4e1
style C fill:#ffe1f5
style D fill:#f5e1ff
style E fill:#e1ffe1
style F fill:#ffe1e1五大核心技术速查表
| 技术 | 核心API | 主要用途 | 难度 |
|---|---|---|---|
| Breakpoints | interrupt_before/after | 敏感操作审批 | ⭐⭐⭐ |
| Dynamic Breakpoints | NodeInterrupt | 条件性智能中断 | ⭐⭐⭐⭐ |
| Edit State | update_state(as_node=...) | 注入人类反馈 | ⭐⭐⭐⭐ |
| Streaming | astream_events | 实时进度显示 | ⭐⭐⭐⭐⭐ |
| Time Travel | get_state_history + 分叉 | 历史回溯调试 | ⭐⭐⭐⭐⭐ |
🎯 复习题目列表
本章精心设计了 10 道综合性问题,涵盖所有核心知识点。建议按顺序完成,每道题预计耗时 15-30 分钟。
基础理解(问题 1-3)
- 静态断点与动态断点的本质区别是什么?各自适用于什么场景?
update_state的as_node参数有什么作用?不使用会怎样?- Streaming 的四种模式(updates/values/astream_events/messages)有什么区别?
实战应用(问题 4-7)
- 如何实现一个支持条件性审批的智能内容审核系统?
- 如何实现类 ChatGPT 的 Token 级流式输出?
- 如何利用 Time Travel 实现 AI 决策的"撤销"功能?
- 设计一个客服 Agent,需要在敏感操作前暂停并修改状态
高级综合(问题 8-10)
- Message ID 在状态编辑和分叉中的作用机制是什么?
- 如何优化 Streaming 性能以支持大规模并发?
- 设计一个完整的 Human-in-the-Loop 系统架构
📚 详细问答解析
问题 1: 静态断点与动态断点的本质区别
展开查看完整解析
核心区别
静态断点(Static Breakpoints):
# 编译时固定设置
graph = builder.compile(
interrupt_before=["tools"], # 总是在 tools 节点前暂停
checkpointer=memory
)动态断点(Dynamic Breakpoints):
# 运行时条件触发
from langgraph.errors import NodeInterrupt
def content_check(state):
risk_score = calculate_risk(state['content'])
if risk_score > 0.7: # 只在高风险时中断
raise NodeInterrupt(f"风险评分: {risk_score},需要审核")
return state对比表
| 维度 | 静态断点 | 动态断点 |
|---|---|---|
| 设置时机 | 编译时(compile()) | 运行时(节点内部) |
| 触发方式 | 固定节点,必然触发 | 条件判断,选择性触发 |
| 信息传递 | 无法传递中断原因 | 可传递详细信息 |
| 灵活性 | 低(每次都中断) | 高(智能判断) |
| 实现方式 | interrupt_before/after | raise NodeInterrupt |
| 适用场景 | 固定审批点 | 智能审核 |
适用场景
静态断点适用场景:
- 固定审批流程
# 所有支付操作都需要人工确认
graph = builder.compile(
interrupt_before=["payment_tool"],
checkpointer=memory
)- 调试模式
# 开发时在每个节点后暂停检查
if DEBUG_MODE:
graph = builder.compile(
interrupt_after=["assistant", "tools", "router"],
checkpointer=memory
)- 用户确认
# 关键操作前总是确认
interrupt_before=["delete_data", "send_email", "deploy"]动态断点适用场景:
- 内容审核
def content_moderation(state):
result = ai_moderation_api(state['content'])
# 高风险:立即拒绝
if result.score > 0.9:
raise NodeInterrupt(f"严重违规,已拒绝。问题: {result.issues}")
# 中风险:人工审核
if result.score > 0.5:
raise NodeInterrupt(f"需要审核。可疑点: {result.warnings}")
# 低风险:自动通过
return state- 资源限制
def resource_check(state):
if len(state['input']) > 10000:
raise NodeInterrupt("输入过长,需要确认是否处理")
if state['estimated_cost'] > 100:
raise NodeInterrupt(f"预估成本 ${state['estimated_cost']},需要批准")
return state- 异常检测
def anomaly_detection(state):
if detect_unusual_pattern(state):
raise NodeInterrupt(
f"检测到异常模式: {get_anomaly_description(state)}"
)
return state组合使用示例
# 静态断点:所有工具调用前暂停
graph = builder.compile(
interrupt_before=["tools"],
checkpointer=memory
)
# 动态断点:在工具节点内部根据工具类型判断
def tools_with_dynamic_check(state):
tool_call = state["messages"][-1].tool_calls[0]
# 高风险工具需要额外审批
if tool_call['name'] in ['delete', 'payment']:
raise NodeInterrupt(
f"高风险工具: {tool_call['name']},需要二次确认"
)
# 执行工具
return execute_tool(tool_call)最佳实践
选择标准:
- 固定流程 → 静态断点
- 智能判断 → 动态断点
- 多级审批 → 组合使用
性能考虑:
- 静态断点开销小(编译时设置)
- 动态断点需要运行时判断(但更灵活)
问题 2: update_state 的 as_node 参数作用
展开查看完整解析
核心作用
as_node 参数告诉 LangGraph:这个状态更新来自哪个节点,从而决定下一步执行哪个节点。
有无 as_node 的区别
不使用 as_node(错误):
# ❌ 状态更新了,但图不知道从哪继续
graph.update_state(thread, {"messages": [HumanMessage("新消息")]})
# 调用 stream(None, thread) 会怎样?
# → 图不知道下一个节点是什么,可能出错或重复执行使用 as_node(正确):
# ✅ 模拟 human_feedback 节点的输出
graph.update_state(
thread,
{"messages": [HumanMessage("新消息")]},
as_node="human_feedback" # 告诉图:这是 human_feedback 节点的输出
)
# 调用 stream(None, thread)
# → 图认为 human_feedback 已完成,执行其后续节点工作原理
图结构示例:
START → [human_feedback] → [assistant] → [tools] → END使用 as_node 的执行流:
# 1. 图暂停在 human_feedback 前
for event in graph.stream(input, thread):
print(event)
# 2. 作为 human_feedback 节点更新状态
graph.update_state(
thread,
{"messages": user_feedback},
as_node="human_feedback"
)
# 3. 继续执行(从 assistant 开始)
for event in graph.stream(None, thread):
print(event) # human_feedback → assistant → tools → END不使用 as_node 的混乱:
# 1. 暂停在 human_feedback 前
# 2. 只更新状态,不指定节点
graph.update_state(thread, {"messages": user_feedback})
# 3. 继续执行时
# → 图可能从头开始
# → 或重复执行 human_feedback
# → 或出现不可预测的行为实际应用示例
场景 1:人工反馈节点
class State(TypedDict):
messages: list
user_input: str
def human_feedback(state):
"""空节点,仅作为暂停点"""
pass
def assistant(state):
return {"messages": [llm.invoke(state["messages"])]}
# 构建图
builder = StateGraph(State)
builder.add_node("human_feedback", human_feedback)
builder.add_node("assistant", assistant)
builder.add_edge(START, "human_feedback")
builder.add_edge("human_feedback", "assistant")
graph = builder.compile(
interrupt_before=["human_feedback"],
checkpointer=memory
)
# 执行流程
thread = {"configurable": {"thread_id": "1"}}
# 暂停在 human_feedback 前
for event in graph.stream({"messages": []}, thread):
print(event)
# 用户输入
user_input = input("你的问题: ")
# 作为 human_feedback 节点更新
graph.update_state(
thread,
{"messages": [HumanMessage(user_input)]},
as_node="human_feedback" # ⭐ 关键
)
# 继续执行(跳过 human_feedback,执行 assistant)
for event in graph.stream(None, thread):
print(event)场景 2:错误修正节点
def error_handler(state):
"""检查错误并修正"""
if state.get("error"):
# 需要人工修正
raise NodeInterrupt("发现错误,需要修正")
return state
# 错误发生
for event in graph.stream(input, thread):
print(event)
# 人工修正错误
corrected_data = fix_error(state.values)
graph.update_state(
thread,
corrected_data,
as_node="error_handler" # 模拟错误已修正
)
# 继续执行
for event in graph.stream(None, thread):
print(event)进阶用法:控制执行流
跳过某些节点:
# 图结构: A → B → C → D
# 想跳过 B 和 C,直接从 D 开始
graph.update_state(
thread,
new_state,
as_node="C" # 假装 C 已执行
)
# 继续执行 → 直接执行 D重新执行某个节点:
# 获取历史状态
history = list(graph.get_state_history(thread))
before_node_b = history[-3]
# 修改状态,但指定为 A 的输出
graph.update_state(
before_node_b.config,
modified_state,
as_node="A"
)
# 继续执行 → 重新执行 B常见错误
错误 1:忘记使用 as_node
# ❌ 错误
graph.update_state(thread, {"messages": [HumanMessage("hi")]})
graph.stream(None, thread) # 可能出错
# ✅ 正确
graph.update_state(thread, {"messages": [HumanMessage("hi")]}, as_node="human")
graph.stream(None, thread)错误 2:as_node 指定了不存在的节点
# ❌ 错误:节点名拼写错误
graph.update_state(thread, data, as_node="assitant") # 拼写错误
# ✅ 正确
graph.update_state(thread, data, as_node="assistant")错误 3:与图结构不匹配
# 图结构: A → B → C
# ❌ 错误:C 后面没有节点
graph.update_state(thread, data, as_node="C")
graph.stream(None, thread) # 无法继续
# ✅ 正确:选择合适的节点
graph.update_state(thread, data, as_node="B")
graph.stream(None, thread) # 继续执行 C问题 3: Streaming 四种模式的区别
展开查看完整解析
四种模式对比
| 模式 | 粒度 | 输出内容 | 数据量 | 典型用途 |
|---|---|---|---|---|
updates | 节点级 | 状态增量更新 | 小 | 追踪状态变化 |
values | 节点级 | 完整状态快照 | 大 | 调试、状态检查 |
astream_events | 事件级 | 所有执行事件 | 最大 | Token streaming |
messages (API) | Token 级 | 消息流 | 中 | 聊天应用 |
模式 1: updates
特点: 只返回节点对状态的增量更新
for chunk in graph.stream(input, thread, stream_mode="updates"):
print(chunk)输出示例:
{
'assistant': {
'messages': [AIMessage(content='Hello!', id='run-123')]
}
}
{
'tools': {
'messages': [ToolMessage(content='6', name='multiply')]
}
}输出结构:
{
'节点名': {
'更新的字段': 新值
}
}适用场景:
- 只关心状态变化,不需要完整状态
- 减少网络传输数据量
- 追踪每个节点的贡献
代码示例:
for chunk in graph.stream(input, thread, stream_mode="updates"):
for node_name, update in chunk.items():
print(f"[{node_name}] 更新了:")
for field, value in update.items():
print(f" {field}: {value}")模式 2: values
特点: 返回每个节点执行后的完整状态
for event in graph.stream(input, thread, stream_mode="values"):
print(event)输出示例:
# 第一次输出(初始状态)
{
'messages': [HumanMessage(content='Multiply 2 and 3')]
}
# 第二次输出(assistant 执行后)
{
'messages': [
HumanMessage(content='Multiply 2 and 3'),
AIMessage(content='', tool_calls=[...])
]
}
# 第三次输出(tools 执行后)
{
'messages': [
HumanMessage(content='Multiply 2 and 3'),
AIMessage(content='', tool_calls=[...]),
ToolMessage(content='6', name='multiply')
]
}适用场景:
- 调试:查看每一步的完整状态
- 状态检查:验证数据是否正确
- 理解执行流程
代码示例:
for event in graph.stream(input, thread, stream_mode="values"):
print(f"当前消息数: {len(event['messages'])}")
for msg in event['messages']:
msg.pretty_print()
print("=" * 50)模式 3: astream_events
特点: 输出图执行过程中的所有事件,支持 Token 级流式输出
async for event in graph.astream_events(input, thread, version="v2"):
print(event)事件类型:
# 节点开始
{"event": "on_chain_start", "name": "assistant"}
# 提示词处理
{"event": "on_prompt_start", "name": "ChatPromptTemplate"}
# LLM 开始
{"event": "on_chat_model_start", "name": "ChatOpenAI"}
# Token 流(⭐ 最重要)
{"event": "on_chat_model_stream", "data": {"chunk": AIMessageChunk(content="The")}}
{"event": "on_chat_model_stream", "data": {"chunk": AIMessageChunk(content=" result")}}
{"event": "on_chat_model_stream", "data": {"chunk": AIMessageChunk(content=" is")}}
# LLM 结束
{"event": "on_chat_model_end", "data": {"output": AIMessage(...)}}Token Streaming 实现:
async for event in graph.astream_events(input, thread, version="v2"):
# 只处理 Token 流事件
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)过滤特定节点的 Token:
target_node = "assistant"
async for event in graph.astream_events(input, thread, version="v2"):
if (event["event"] == "on_chat_model_stream" and
event["metadata"].get("langgraph_node") == target_node):
token = event["data"]["chunk"].content
print(token, end="", flush=True)适用场景:
- 实现类 ChatGPT 的打字机效果
- 实时显示 AI 思考过程
- 细粒度的执行监控
模式 4: messages (API 专属)
特点: LangGraph API Server 提供的专门优化模式,自动处理消息差异
# 需要使用 LangGraph API
from langgraph_sdk import get_client
client = get_client(url="http://127.0.0.1:2024")
async for event in client.runs.stream(
thread_id,
assistant_id="agent",
input=input_data,
stream_mode="messages"
):
print(event.event, event.data)事件类型:
# 元数据
{"event": "metadata", "data": {"run_id": "..."}}
# 完整消息
{"event": "messages/complete", "data": [{"content": "Hello", "role": "user"}]}
# 部分消息(Token 流)
{"event": "messages/partial", "data": [{"content": "The"}]}
{"event": "messages/partial", "data": [{"content": "The result"}]}
{"event": "messages/partial", "data": [{"content": "The result is"}]}
# 消息元数据
{"event": "messages/metadata", "data": {"finish_reason": "stop"}}适用场景:
- 生产级聊天应用
- 与前端框架集成
- 更高效的网络传输
对比本地 astream_events:
| 特性 | astream_events | messages (API) |
|---|---|---|
| 可用性 | 本地图 | 仅 API Server |
| Token 流 | 需要过滤事件 | 自动处理 |
| 消息差异 | 手动计算 | 自动计算 |
| 网络优化 | 无 | 有 |
模式选择指南
# 场景 1:追踪状态变化
for chunk in graph.stream(input, thread, stream_mode="updates"):
log_state_changes(chunk)
# 场景 2:调试和开发
for event in graph.stream(input, thread, stream_mode="values"):
debug_full_state(event)
# 场景 3:聊天应用(本地)
async for event in graph.astream_events(input, thread, version="v2"):
if event["event"] == "on_chat_model_stream":
display_token(event["data"]["chunk"].content)
# 场景 4:聊天应用(API,推荐)
async for event in client.runs.stream(..., stream_mode="messages"):
if event.event == "messages/partial":
display_token(event.data[0]["content"])性能考虑
| 模式 | 数据量 | 网络开销 | CPU 开销 |
|---|---|---|---|
updates | 最小 | 最小 | 最小 |
values | 大 | 大 | 小 |
astream_events | 最大 | 最大 | 中 |
messages (API) | 中 | 中 | 小(服务器端) |
优化建议:
- 生产环境优先使用
updates或messages(API) - 调试时使用
values - Token streaming 使用
messages(API)或优化后的astream_events
问题 4: 实现条件性审批的智能内容审核系统
展开查看完整解析
需求分析
构建一个内容审核系统,根据风险评分智能决定是否需要人工审核:
- 高风险(> 0.9):自动拒绝
- 中风险(0.5-0.9):人工审核
- 低风险(< 0.5):自动通过
完整实现
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
from typing import TypedDict, Literal
import json
# 定义状态
class ContentState(TypedDict):
content: str
risk_score: float
issues: list[str]
review_decision: str # "approve", "reject", "modify"
status: str
# 风险评估节点
def assess_risk(state: ContentState):
"""使用 AI 模型评估内容风险"""
content = state['content']
# 调用内容审核 API(示例)
result = moderation_api.analyze(content)
return {
"risk_score": result.score,
"issues": result.detected_issues
}
# 动态断点节点
def conditional_approval(state: ContentState):
"""根据风险评分决定是否需要人工审核"""
score = state['risk_score']
issues = state.get('issues', [])
# 高风险:自动拒绝
if score > 0.9:
return {
"status": "auto_rejected",
"review_decision": "reject"
}
# 中风险:触发人工审核
if score > 0.5:
# 准备详细信息
interrupt_info = {
"risk_score": score,
"issues": issues,
"content_preview": state['content'][:200],
"recommendation": "建议人工审核"
}
# 抛出 NodeInterrupt
raise NodeInterrupt(json.dumps(interrupt_info, ensure_ascii=False))
# 低风险:自动通过
return {
"status": "auto_approved",
"review_decision": "approve"
}
# 执行决策节点
def execute_decision(state: ContentState):
"""根据审核决策执行相应操作"""
decision = state.get('review_decision')
if decision == "approve":
return {"status": "approved", "message": "内容已发布"}
elif decision == "reject":
return {"status": "rejected", "message": "内容已拒绝"}
elif decision == "modify":
# 可以要求用户修改
return {"status": "needs_modification"}
else:
return {"status": "pending"}
# 构建图
builder = StateGraph(ContentState)
builder.add_node("assess_risk", assess_risk)
builder.add_node("conditional_approval", conditional_approval)
builder.add_node("execute_decision", execute_decision)
builder.add_edge(START, "assess_risk")
builder.add_edge("assess_risk", "conditional_approval")
builder.add_edge("conditional_approval", "execute_decision")
builder.add_edge("execute_decision", END)
# 编译(需要 checkpointer 支持动态断点)
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
# 执行流程
def moderate_content(content: str, thread_id: str):
"""内容审核完整流程"""
thread = {"configurable": {"thread_id": thread_id}}
# 1. 提交内容审核
input_data = {"content": content}
for event in graph.stream(input_data, thread, stream_mode="values"):
print(f"状态: {event.get('status', 'processing')}")
# 2. 检查是否需要人工审核
state = graph.get_state(thread)
if state.tasks and state.tasks[0].interrupts:
# 解析中断信息
interrupt_info = json.loads(state.tasks[0].interrupts[0].value)
print("\n⚠️ 需要人工审核:")
print(f" 风险评分: {interrupt_info['risk_score']}")
print(f" 检测到的问题: {', '.join(interrupt_info['issues'])}")
print(f" 内容预览: {interrupt_info['content_preview']}")
# 3. 获取人工决策
print("\n请选择操作:")
print(" 1. 批准 (approve)")
print(" 2. 拒绝 (reject)")
print(" 3. 要求修改 (modify)")
choice = input("选择 (1/2/3): ")
decision_map = {"1": "approve", "2": "reject", "3": "modify"}
decision = decision_map.get(choice, "reject")
# 4. 更新决策并继续
graph.update_state(
thread,
{"review_decision": decision},
as_node="conditional_approval"
)
# 5. 继续执行
for event in graph.stream(None, thread, stream_mode="values"):
print(f"最终状态: {event.get('status')}")
print(f"消息: {event.get('message')}")
else:
# 自动处理(无需人工审核)
print(f"✅ 自动处理: {state.values.get('status')}")
# 使用示例
if __name__ == "__main__":
# 测试 1:低风险内容(自动通过)
moderate_content("这是一篇关于技术的文章", "thread_1")
# 测试 2:中风险内容(需要人工审核)
moderate_content("这篇文章包含一些争议性观点", "thread_2")
# 测试 3:高风险内容(自动拒绝)
moderate_content("包含严重违规内容", "thread_3")进阶优化
1. 多级风险评分:
def advanced_risk_assessment(state):
"""多维度风险评估"""
content = state['content']
# 多个维度评分
violence_score = check_violence(content)
hate_speech_score = check_hate_speech(content)
spam_score = check_spam(content)
# 综合评分
综合_score = max(violence_score, hate_speech_score, spam_score)
# 详细问题列表
issues = []
if violence_score > 0.5:
issues.append(f"暴力内容 ({violence_score:.2f})")
if hate_speech_score > 0.5:
issues.append(f"仇恨言论 ({hate_speech_score:.2f})")
if spam_score > 0.5:
issues.append(f"垃圾信息 ({spam_score:.2f})")
return {
"risk_score": max_score,
"issues": issues,
"violence_score": violence_score,
"hate_speech_score": hate_speech_score,
"spam_score": spam_score
}2. 审批历史记录:
from datetime import datetime
审批_log = []
def log_approval_decision(thread_id, content, score, decision, reviewer):
"""记录审批历史"""
审批_log.append({
"timestamp": datetime.now().isoformat(),
"thread_id": thread_id,
"content_hash": hash(content),
"risk_score": score,
"decision": decision,
"reviewer": reviewer
})
# 在人工审核后调用
log_approval_decision(
thread_id,
state['content'],
state['risk_score'],
decision,
reviewer_id="admin_001"
)3. 自动学习阈值:
class AdaptiveThreshold:
"""自适应风险阈值"""
def __init__(self, initial_threshold=0.5):
self.threshold = initial_threshold
self.approval_history = []
def update(self, score, human_decision):
"""根据人工决策调整阈值"""
self.approval_history.append((score, human_decision))
# 如果低分内容被拒绝,降低阈值
if score < self.threshold and human_decision == "reject":
self.threshold = max(0.3, self.threshold - 0.05)
# 如果高分内容被批准,提高阈值
if score > self.threshold and human_decision == "approve":
self.threshold = min(0.8, self.threshold + 0.05)
def should_review(self, score):
"""判断是否需要人工审核"""
return score > self.threshold
# 使用
threshold_manager = AdaptiveThreshold()
def smart_conditional_approval(state):
score = state['risk_score']
if threshold_manager.should_review(score):
raise NodeInterrupt(f"风险评分 {score} 超过阈值 {threshold_manager.threshold}")
return {"status": "auto_approved"}问题 5: 实现类 ChatGPT 的 Token 级流式输出
展开查看完整解析
方案对比
| 方案 | 环境 | 复杂度 | 推荐度 |
|---|---|---|---|
astream_events | 本地 | 中 | ⭐⭐⭐⭐ |
messages (API) | LangGraph Server | 低 | ⭐⭐⭐⭐⭐ |
方案 1: 本地 astream_events
import asyncio
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
# 定义模型
llm = ChatOpenAI(model="gpt-4", streaming=True) # 启用 streaming
# 定义节点
def chatbot(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}
# 构建图
builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)
graph = builder.compile(checkpointer=MemorySaver())
# Token Streaming 实现
async def stream_chat_response(user_message: str, thread_id: str):
"""流式输出聊天响应"""
from langchain_core.messages import HumanMessage
config = {"configurable": {"thread_id": thread_id}}
input_data = {"messages": [HumanMessage(content=user_message)]}
print(f"用户: {user_message}")
print("AI: ", end="", flush=True)
full_response = ""
async for event in graph.astream_events(input_data, config, version="v2"):
# 过滤 Token 流事件
if (event["event"] == "on_chat_model_stream" and
event["metadata"].get("langgraph_node") == "chatbot"):
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)
full_response += token
print("\n") # 换行
return full_response
# 使用
if __name__ == "__main__":
asyncio.run(stream_chat_response(
"什么是 LangGraph?",
"user_123"
))输出效果:
用户: 什么是 LangGraph?
AI: LangGraph 是一个用于构建有状态的、多步骤的 AI 应用程序的框架...方案 2: LangGraph API messages 模式(推荐)
启动 API Server:
cd your_project
langgraph dev客户端实现:
from langgraph_sdk import get_client
from langchain_core.messages import HumanMessage
async def stream_chat_api(user_message: str):
"""使用 API 的 messages 模式流式输出"""
# 连接到 LangGraph Server
client = get_client(url="http://127.0.0.1:2024")
# 创建线程
thread = await client.threads.create()
print(f"用户: {user_message}")
print("AI: ", end="", flush=True)
full_response = ""
async for event in client.runs.stream(
thread["thread_id"],
assistant_id="chatbot",
input={"messages": [HumanMessage(content=user_message)]},
stream_mode="messages"
):
# 处理部分消息(Token 流)
if event.event == "messages/partial":
for item in event.data:
if "content" in item and item.get("type") != "human":
# 提取新增的 Token
content = item["content"]
# 计算增量(与之前的响应对比)
if len(content) > len(full_response):
new_token = content[len(full_response):]
print(new_token, end="", flush=True)
full_response = content
print("\n")
return full_response
# 使用
asyncio.run(stream_chat_api("解释什么是 Token Streaming"))优化技巧
1. 批量输出(减少 I/O):
async def batched_stream(user_message: str, batch_size=3):
"""批量输出 Token"""
buffer = []
async for event in graph.astream_events(...):
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
buffer.append(token)
# 缓冲区满时输出
if len(buffer) >= batch_size:
print("".join(buffer), end="", flush=True)
buffer.clear()
# 输出剩余 Token
if buffer:
print("".join(buffer), end="", flush=True)2. 添加打字机效果:
import time
async def typewriter_effect(user_message: str, delay=0.05):
"""打字机效果"""
async for event in graph.astream_events(...):
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)
await asyncio.sleep(delay) # 延迟3. 实时显示思考过程:
async def stream_with_progress(user_message: str):
"""显示 AI 思考过程"""
current_node = None
async for event in graph.astream_events(...):
# 节点开始
if event["event"] == "on_chain_start":
node = event["metadata"].get("langgraph_node")
if node and node != current_node:
current_node = node
print(f"\n[{node}] ", end="", flush=True)
# Token 输出
elif event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)Web 应用集成
FastAPI + SSE (Server-Sent Events):
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(message: str, thread_id: str):
"""SSE 流式聊天端点"""
async def event_generator():
config = {"configurable": {"thread_id": thread_id}}
input_data = {"messages": [HumanMessage(content=message)]}
async for event in graph.astream_events(input_data, config, version="v2"):
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
# SSE 格式
yield {
"event": "token",
"data": token
}
# 发送完成信号
yield {
"event": "done",
"data": "[DONE]"
}
return EventSourceResponse(event_generator())
# 前端使用 EventSource 接收前端 JavaScript 示例:
const eventSource = new EventSource(`/chat/stream?message=hello&thread_id=123`);
let responseDiv = document.getElementById('response');
eventSource.addEventListener('token', (e) => {
responseDiv.textContent += e.data;
});
eventSource.addEventListener('done', (e) => {
eventSource.close();
console.log('Stream completed');
});错误处理
async def safe_stream(user_message: str):
"""带错误处理的流式输出"""
try:
full_response = ""
async for event in graph.astream_events(...):
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)
full_response += token
return full_response
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断")
return full_response
except asyncio.TimeoutError:
print("\n\n⚠️ 响应超时")
return full_response
except Exception as e:
print(f"\n\n❌ 错误: {e}")
return full_response问题 6: 利用 Time Travel 实现"撤销"功能
展开查看完整解析
核心思路
利用 LangGraph 的 checkpointer 自动保存的历史状态,实现"撤销到上一步"的功能。
完整实现
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
class UndoableAgent:
"""支持撤销功能的 Agent"""
def __init__(self):
# 定义 LLM
self.llm = ChatOpenAI(model="gpt-4")
# 构建图
builder = StateGraph(MessagesState)
builder.add_node("assistant", self.assistant_node)
builder.add_edge(START, "assistant")
builder.add_edge("assistant", END)
# 使用 checkpointer(必需)
self.memory = MemorySaver()
self.graph = builder.compile(checkpointer=self.memory)
def assistant_node(self, state: MessagesState):
"""助手节点"""
return {"messages": [self.llm.invoke(state["messages"])]}
def execute(self, user_message: str, thread_id: str):
"""执行并返回响应"""
thread = {"configurable": {"thread_id": thread_id}}
input_data = {"messages": [HumanMessage(content=user_message)]}
# 执行图
result = None
for event in self.graph.stream(input_data, thread, stream_mode="values"):
result = event
return result['messages'][-1].content
def undo(self, thread_id: str, steps=1):
"""撤销最近的 N 步"""
thread = {"configurable": {"thread_id": thread_id}}
# 获取历史
history = list(self.graph.get_state_history(thread))
if len(history) <= steps:
print("⚠️ 无法撤销:没有足够的历史记录")
return None
# 回到 N 步之前的状态
target_state = history[steps]
print(f"✅ 已撤销到 {steps} 步之前")
print(f" Checkpoint ID: {target_state.config['configurable']['checkpoint_id'][:8]}...")
return target_state
def get_history_summary(self, thread_id: str):
"""显示历史摘要"""
thread = {"configurable": {"thread_id": thread_id}}
history = list(self.graph.get_state_history(thread))
print(f"\n📜 历史记录(共 {len(history)} 个状态):")
for i, state in enumerate(history):
step_num = len(history) - i - 1
timestamp = state.created_at.strftime("%H:%M:%S") if state.created_at else "N/A"
msg_count = len(state.values.get('messages', []))
# 获取最后一条消息的预览
if msg_count > 0:
last_msg = state.values['messages'][-1]
preview = last_msg.content[:50] if hasattr(last_msg, 'content') else str(last_msg)[:50]
else:
preview = "(空)"
print(f" [{step_num}] {timestamp} - {msg_count} 条消息 - {preview}")
return history
def continue_from_checkpoint(self, checkpoint_config):
"""从指定 checkpoint 继续执行"""
# 从历史状态继续
result = None
for event in self.graph.stream(None, checkpoint_config, stream_mode="values"):
result = event
return result
# 使用示例
def demo_undo():
"""演示撤销功能"""
agent = UndoableAgent()
thread_id = "demo_thread"
print("=== 对话开始 ===\n")
# 步骤 1
print("用户: 你好")
response1 = agent.execute("你好", thread_id)
print(f"AI: {response1}\n")
# 步骤 2
print("用户: 我想了解 LangGraph")
response2 = agent.execute("我想了解 LangGraph", thread_id)
print(f"AI: {response2}\n")
# 步骤 3
print("用户: 它有什么优势?")
response3 = agent.execute("它有什么优势?", thread_id)
print(f"AI: {response3}\n")
# 显示历史
history = agent.get_history_summary(thread_id)
# 撤销最近的 2 步
print("\n=== 执行撤销 ===")
target_state = agent.undo(thread_id, steps=2)
if target_state:
# 查看撤销后的状态
print("\n撤销后的消息:")
for msg in target_state.values['messages']:
print(f" - {msg.content[:50]}")
# 从撤销点继续新的对话
print("\n=== 从撤销点继续新对话 ===")
# 修改输入,创建新分支
new_response = agent.execute("其实我想了解 Python", thread_id)
print(f"用户: 其实我想了解 Python")
print(f"AI: {new_response}")
if __name__ == "__main__":
demo_undo()高级功能:多级撤销栈
class AdvancedUndoAgent(UndoableAgent):
"""支持多级撤销/重做的 Agent"""
def __init__(self):
super().__init__()
self.undo_stack = [] # 撤销栈
self.redo_stack = [] # 重做栈
def execute(self, user_message: str, thread_id: str):
"""执行并记录到撤销栈"""
# 获取执行前的状态
thread = {"configurable": {"thread_id": thread_id}}
before_state = self.graph.get_state(thread)
# 执行操作
response = super().execute(user_message, thread_id)
# 获取执行后的状态
after_state = self.graph.get_state(thread)
# 保存到撤销栈
self.undo_stack.append({
"before": before_state,
"after": after_state,
"action": user_message
})
# 清空重做栈
self.redo_stack.clear()
return response
def undo_last(self, thread_id: str):
"""撤销最后一次操作"""
if not self.undo_stack:
print("⚠️ 无法撤销:撤销栈为空")
return False
# 弹出最后一次操作
last_action = self.undo_stack.pop()
# 保存到重做栈
self.redo_stack.append(last_action)
# 恢复到之前的状态(通过分叉)
thread = {"configurable": {"thread_id": thread_id}}
self.graph.update_state(
thread,
last_action["before"].values
)
print(f"✅ 已撤销操作: {last_action['action'][:50]}")
return True
def redo_last(self, thread_id: str):
"""重做最后一次撤销的操作"""
if not self.redo_stack:
print("⚠️ 无法重做:重做栈为空")
return False
# 弹出重做栈
action = self.redo_stack.pop()
# 恢复到执行后的状态
thread = {"configurable": {"thread_id": thread_id}}
self.graph.update_state(
thread,
action["after"].values
)
# 保存回撤销栈
self.undo_stack.append(action)
print(f"✅ 已重做操作: {action['action'][:50]}")
return True用户界面示例
def interactive_chat_with_undo():
"""交互式聊天,支持撤销"""
agent = AdvancedUndoAgent()
thread_id = "interactive_session"
print("🤖 支持撤销的聊天机器人")
print(" 命令: /undo - 撤销, /redo - 重做, /history - 历史, /quit - 退出\n")
while True:
user_input = input("你: ")
# 处理命令
if user_input == "/quit":
break
elif user_input == "/undo":
agent.undo_last(thread_id)
continue
elif user_input == "/redo":
agent.redo_last(thread_id)
continue
elif user_input == "/history":
agent.get_history_summary(thread_id)
continue
# 正常对话
try:
response = agent.execute(user_input, thread_id)
print(f"AI: {response}\n")
except Exception as e:
print(f"❌ 错误: {e}\n")
if __name__ == "__main__":
interactive_chat_with_undo()时间线可视化
def visualize_timeline(agent, thread_id):
"""可视化对话时间线"""
thread = {"configurable": {"thread_id": thread_id}}
history = list(agent.graph.get_state_history(thread))
print("\n📊 对话时间线:")
print("=" * 60)
for i, state in enumerate(reversed(history)):
step_num = len(history) - i - 1
indent = " " * (step_num % 3) # 缩进显示层次
# 获取消息数
msg_count = len(state.values.get('messages', []))
# 显示
print(f"{indent}[{step_num}] {msg_count} 条消息")
# 显示最后一条消息
if msg_count > 0:
last_msg = state.values['messages'][-1]
if hasattr(last_msg, 'content'):
preview = last_msg.content[:40]
print(f"{indent} └─ {preview}...")
# 分支标记
if i < len(history) - 1:
print(f"{indent} │")
print("=" * 60)问题 7: 设计敏感操作前暂停并修改状态的客服 Agent
展开查看完整解析
需求分析
构建一个客服 Agent,具备以下能力:
- 在执行敏感操作(如退款、删除订单)前暂停
- 显示操作详情供人工审核
- 允许修改操作参数
- 根据审核结果继续或取消
完整实现
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, List, Literal
from langchain_openai import ChatOpenAI
# 定义工具
def查询订单(order_id: str) -> dict:
"""查询订单信息"""
return {"order_id": order_id, "status": "已发货", "amount": 299.00}
def申请退款(order_id: str, amount: float, reason: str) -> dict:
"""申请退款(敏感操作)"""
return {"order_id": order_id, "refund_amount": amount, "status": "退款成功"}
def删除订单(order_id: str) -> dict:
"""删除订单(敏感操作)"""
return {"order_id": order_id, "status": "已删除"}
# 工具分类
SENSITIVE_TOOLS = ["申请退款", "删除订单"]
# 定义状态
class CustomerServiceState(TypedDict):
messages: List
pending_tool: dict # 待执行的工具
approval_status: Literal["pending", "approved", "rejected"]
modified_params: dict # 修改后的参数
# 构建 Agent
llm = ChatOpenAI(model="gpt-4")
tools = [查询订单, 申请退款, 删除订单]
llm_with_tools = llm.bind_tools(tools)
def assistant(state: CustomerServiceState):
"""助手节点:生成工具调用"""
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
def check_if_sensitive(state: CustomerServiceState):
"""检查是否需要人工审批"""
last_message = state["messages"][-1]
if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
tool_call = last_message.tool_calls[0]
if tool_call['name'] in SENSITIVE_TOOLS:
# 保存待审批的工具调用
return {
"pending_tool": tool_call,
"approval_status": "pending"
}
return {"approval_status": "approved"}
def route_after_check(state: CustomerServiceState):
"""根据审批状态路由"""
if state.get("approval_status") == "pending":
return "wait_for_approval" # 需要审批
else:
return "execute_tool" # 直接执行
def wait_for_approval(state: CustomerServiceState):
"""等待审批节点(空节点,用于 breakpoint)"""
pass
def execute_tool(state: CustomerServiceState):
"""执行工具节点"""
# 获取工具调用(可能已被修改)
if state.get("modified_params"):
# 使用修改后的参数
tool_call = state["pending_tool"].copy()
tool_call['args'] = state["modified_params"]
else:
# 使用原始参数
tool_call = state["messages"][-1].tool_calls[0]
# 执行工具
tool_name = tool_call['name']
tool_args = tool_call['args']
if tool_name == "查询订单":
result = 查询订单(**tool_args)
elif tool_name == "申请退款":
result = 申请退款(**tool_args)
elif tool_name == "删除订单":
result = 删除订单(**tool_args)
# 返回工具结果
from langchain_core.messages import ToolMessage
return {"messages": [ToolMessage(
content=str(result),
tool_call_id=tool_call['id']
)]}
# 构建图
builder = StateGraph(CustomerServiceState)
builder.add_node("assistant", assistant)
builder.add_node("check_if_sensitive", check_if_sensitive)
builder.add_node("wait_for_approval", wait_for_approval)
builder.add_node("execute_tool", execute_tool)
builder.add_edge(START, "assistant")
builder.add_edge("assistant", "check_if_sensitive")
builder.add_conditional_edges(
"check_if_sensitive",
route_after_check,
{
"wait_for_approval": "wait_for_approval",
"execute_tool": "execute_tool"
}
)
builder.add_edge("wait_for_approval", "execute_tool")
builder.add_edge("execute_tool", "assistant")
# 编译(在 wait_for_approval 前中断)
memory = MemorySaver()
graph = builder.compile(
interrupt_before=["wait_for_approval"],
checkpointer=memory
)
# 完整流程
def customer_service_flow():
"""客服系统完整流程"""
from langchain_core.messages import HumanMessage
thread = {"configurable": {"thread_id": "customer_001"}}
# 用户请求退款
print("=== 用户请求 ===")
user_request = "我要申请订单 #12345 的退款,金额 299 元"
print(f"用户: {user_request}\n")
input_data = {"messages": [HumanMessage(content=user_request)]}
# 执行到断点
for event in graph.stream(input_data, thread, stream_mode="values"):
if 'pending_tool' in event:
print("⚠️ 检测到敏感操作,等待审批...")
break
# 获取状态,查看待审批的操作
state = graph.get_state(thread)
pending_tool = state.values.get('pending_tool')
if pending_tool:
print("\n=== 待审批操作 ===")
print(f"工具: {pending_tool['name']}")
print(f"参数: {pending_tool['args']}")
# 人工审核
print("\n=== 审核选项 ===")
print("1. 批准(原参数)")
print("2. 批准(修改参数)")
print("3. 拒绝")
choice = input("\n选择 (1/2/3): ")
if choice == "1":
# 批准,使用原参数
print("\n✅ 已批准,执行中...")
for event in graph.stream(None, thread, stream_mode="values"):
if 'messages' in event:
last_msg = event['messages'][-1]
if hasattr(last_msg, 'content'):
print(f"结果: {last_msg.content}")
elif choice == "2":
# 修改参数后批准
print("\n📝 修改参数:")
new_amount = input(f" 退款金额 (原: {pending_tool['args']['amount']}): ")
new_reason = input(f" 退款原因 (原: {pending_tool['args']['reason']}): ")
# 更新状态
modified_params = pending_tool['args'].copy()
if new_amount:
modified_params['amount'] = float(new_amount)
if new_reason:
modified_params['reason'] = new_reason
graph.update_state(
thread,
{"modified_params": modified_params},
as_node="wait_for_approval"
)
print("\n✅ 已批准(修改后参数),执行中...")
for event in graph.stream(None, thread, stream_mode="values"):
if 'messages' in event:
last_msg = event['messages'][-1]
if hasattr(last_msg, 'content'):
print(f"结果: {last_msg.content}")
else:
# 拒绝
print("\n❌ 已拒绝操作")
# 可以添加拒绝原因到状态
if __name__ == "__main__":
customer_service_flow()进阶:审批日志系统
from datetime import datetime
import json
class ApprovalLogger:
"""审批日志记录器"""
def __init__(self, log_file="approval_log.jsonl"):
self.log_file = log_file
def log(self, thread_id, tool_call, decision, modifier=None, reviewer=None):
"""记录审批日志"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"thread_id": thread_id,
"tool": tool_call['name'],
"original_args": tool_call['args'],
"modified_args": modifier,
"decision": decision, # "approved", "rejected", "modified"
"reviewer": reviewer or "system"
}
with open(self.log_file, "a") as f:
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
print(f"📝 已记录审批日志: {decision}")
def get_logs(self, thread_id=None):
"""获取审批日志"""
logs = []
with open(self.log_file, "r") as f:
for line in f:
log = json.loads(line)
if thread_id is None or log["thread_id"] == thread_id:
logs.append(log)
return logs
# 在审批流程中使用
logger = ApprovalLogger()
# 记录批准
logger.log(
thread_id="customer_001",
tool_call=pending_tool,
decision="approved",
reviewer="admin_user"
)
# 记录修改后批准
logger.log(
thread_id="customer_001",
tool_call=pending_tool,
decision="modified",
modifier=modified_params,
reviewer="admin_user"
)问题 8: Message ID 在状态编辑和分叉中的作用机制
展开查看完整解析
核心机制
Message ID 是 LangChain 消息系统的唯一标识符,在 add_messages reducer 中起到关键作用:
Reducer 行为规则:
- 有 ID,ID 相同 → 覆盖(更新)
- 有 ID,ID 不同 → 追加
- 无 ID → 追加(自动生成新 ID)
消息覆盖机制
from langchain_core.messages import HumanMessage
# 原始消息
original = HumanMessage(content="Hello", id="msg_001")
# 场景 1:覆盖(保留相同 ID)
updated = HumanMessage(content="Hi there", id="msg_001") # 相同 ID
graph.update_state(thread, {"messages": [updated]})
# 结果:original 被 updated 覆盖
# 场景 2:追加(使用不同 ID)
new_msg = HumanMessage(content="Another message", id="msg_002") # 不同 ID
graph.update_state(thread, {"messages": [new_msg]})
# 结果:new_msg 追加到列表末尾
# 场景 3:追加(不提供 ID)
no_id_msg = HumanMessage(content="No ID message") # 无 ID
graph.update_state(thread, {"messages": [no_id_msg]})
# 结果:自动生成 ID,追加到列表末尾在分叉中的应用
分叉的核心:覆盖历史消息
# 获取历史状态
history = list(graph.get_state_history(thread))
fork_point = history[-2]
# 获取要修改的消息
original_msg = fork_point.values['messages'][0]
print(f"原始消息 ID: {original_msg.id}")
print(f"原始内容: {original_msg.content}")
# 创建分叉:保留 ID,修改内容
forked_msg = HumanMessage(
content="修改后的内容",
id=original_msg.id # ⭐ 关键:保留相同 ID
)
# 更新状态
fork_config = graph.update_state(
fork_point.config,
{"messages": [forked_msg]}
)
# 从分叉继续执行
for event in graph.stream(None, fork_config):
print(event)如果不保留 ID 会怎样?
# ❌ 错误:不保留 ID
forked_msg = HumanMessage(content="修改后的内容") # 新 ID
graph.update_state(fork_point.config, {"messages": [forked_msg]})
# 结果:
# - 原始消息仍然存在
# - 新消息追加到末尾
# - 消息列表变成: [原始消息, 修改后的内容]
# - 不是真正的"分叉",而是"追加"实际示例:A/B 测试
def ab_test_prompts(graph, base_checkpoint):
"""A/B 测试不同的用户输入"""
# 获取原始消息
original_msg = base_checkpoint.values['messages'][0]
original_id = original_msg.id
# 版本 A:原始提示
print("=== 版本 A(原始) ===")
for event in graph.stream(None, base_checkpoint.config):
print(event)
# 版本 B:修改提示
print("\n=== 版本 B(修改) ===")
variant_b = HumanMessage(
content="请用简洁的方式回答:" + original_msg.content,
id=original_id # 保留 ID 以覆盖
)
fork_b = graph.update_state(
base_checkpoint.config,
{"messages": [variant_b]}
)
for event in graph.stream(None, fork_b):
print(event)
# 版本 C:完全不同的问题
print("\n=== 版本 C(替代) ===")
variant_c = HumanMessage(
content="换个问题:解释什么是 Time Travel",
id=original_id # 保留 ID 以覆盖
)
fork_c = graph.update_state(
base_checkpoint.config,
{"messages": [variant_c]}
)
for event in graph.stream(None, fork_c):
print(event)Message ID 生成规则
from langchain_core.messages import HumanMessage
# 自动生成 ID(UUID)
msg1 = HumanMessage(content="Hello")
print(msg1.id) # 输出: run-abc123...
# 手动指定 ID
msg2 = HumanMessage(content="Hello", id="custom_id_001")
print(msg2.id) # 输出: custom_id_001
# 复制消息时保留 ID
msg3 = msg1.copy()
print(msg3.id == msg1.id) # True
# 修改内容时可以选择保留或更换 ID
msg4 = HumanMessage(content="New content", id=msg1.id)
print(msg4.id == msg1.id) # True高级技巧:批量消息更新
def batch_update_messages(graph, thread, updates):
"""批量更新多条消息"""
state = graph.get_state(thread)
messages = state.values['messages']
# 创建更新映射
updates_map = {update.id: update for update in updates}
# 应用更新
updated_messages = []
for msg in messages:
if msg.id in updates_map:
updated_messages.append(updates_map[msg.id])
else:
updated_messages.append(msg)
# 更新状态
graph.update_state(thread, {"messages": updated_messages})
# 使用
updates = [
HumanMessage(content="修改后的消息 1", id="msg_001"),
HumanMessage(content="修改后的消息 2", id="msg_003")
]
batch_update_messages(graph, thread, updates)调试技巧:追踪 Message ID
def trace_message_ids(state):
"""追踪消息 ID 变化"""
messages = state.values.get('messages', [])
print("\n📋 消息 ID 追踪:")
for i, msg in enumerate(messages):
msg_type = type(msg).__name__
content_preview = msg.content[:30] if hasattr(msg, 'content') else str(msg)[:30]
print(f" [{i}] {msg_type} | ID: {msg.id[:8]}... | {content_preview}...")
# 在状态更新前后调用
print("更新前:")
trace_message_ids(state_before)
print("\n更新后:")
trace_message_ids(state_after)问题 9: 优化 Streaming 性能以支持大规模并发
展开查看完整解析
性能瓶颈分析
| 瓶颈点 | 影响 | 优化方向 |
|---|---|---|
| 事件过滤开销 | CPU 密集 | 提前编译过滤条件 |
| 频繁 I/O 输出 | 网络延迟 | 批量缓冲输出 |
| 大量并发连接 | 内存/连接数 | 连接池、限流 |
| Token 逐个传输 | 网络往返 | 批量传输 |
优化方案 1:事件过滤优化
# ❌ 低效:每次都进行复杂条件判断
async for event in graph.astream_events(...):
if (event.get("event") == "on_chat_model_stream" and
event.get("metadata", {}).get("langgraph_node") == "assistant" and
"data" in event and
event["data"].get("chunk") is not None):
token = event["data"]["chunk"].content
if token and len(token) > 0:
print(token, end="")
# ✅ 高效:提前准备过滤条件
TARGET_EVENT = "on_chat_model_stream"
TARGET_NODE = "assistant"
async for event in graph.astream_events(...):
# 快速检查事件类型
if event.get("event") != TARGET_EVENT:
continue
# 快速检查节点
if event.get("metadata", {}).get("langgraph_node") != TARGET_NODE:
continue
# 提取 token
chunk = event.get("data", {}).get("chunk")
if chunk and chunk.content:
print(chunk.content, end="", flush=True)优化方案 2:批量缓冲输出
class TokenBuffer:
"""Token 缓冲器"""
def __init__(self, buffer_size=5, flush_interval=0.1):
self.buffer_size = buffer_size
self.flush_interval = flush_interval
self.buffer = []
self.last_flush = time.time()
def add(self, token):
"""添加 Token"""
self.buffer.append(token)
# 达到缓冲区大小或超时则刷新
if (len(self.buffer) >= self.buffer_size or
time.time() - self.last_flush >= self.flush_interval):
return self.flush()
return None
def flush(self):
"""刷新缓冲区"""
if not self.buffer:
return None
batch = "".join(self.buffer)
self.buffer.clear()
self.last_flush = time.time()
return batch
# 使用
async def buffered_stream(graph, input_data, config):
"""带缓冲的流式输出"""
buffer = TokenBuffer(buffer_size=5)
async for event in graph.astream_events(input_data, config, version="v2"):
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
batch = buffer.add(token)
if batch:
print(batch, end="", flush=True)
# 刷新剩余 Token
final_batch = buffer.flush()
if final_batch:
print(final_batch, end="", flush=True)优化方案 3:连接池管理
import asyncio
from asyncio import Semaphore
class ConcurrentStreamManager:
"""并发流管理器"""
def __init__(self, max_concurrent=100):
self.semaphore = Semaphore(max_concurrent)
self.active_streams = {}
async def stream_for_user(self, user_id, graph, input_data, config):
"""为特定用户流式输出(带并发控制)"""
async with self.semaphore: # 限制并发数
try:
self.active_streams[user_id] = {
"status": "streaming",
"started_at": time.time()
}
async for event in graph.astream_events(input_data, config, version="v2"):
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
yield token
self.active_streams[user_id]["status"] = "completed"
except Exception as e:
self.active_streams[user_id]["status"] = "error"
self.active_streams[user_id]["error"] = str(e)
raise
finally:
# 清理
if user_id in self.active_streams:
del self.active_streams[user_id]
# 使用
manager = ConcurrentStreamManager(max_concurrent=50)
async def handle_multiple_users():
"""处理多用户并发"""
tasks = []
for user_id in range(100):
task = asyncio.create_task(
process_user_stream(manager, user_id)
)
tasks.append(task)
# 并发执行(最多 50 个同时进行)
await asyncio.gather(*tasks)
async def process_user_stream(manager, user_id):
"""处理单个用户的流"""
async for token in manager.stream_for_user(
user_id,
graph,
input_data,
config
):
# 处理 token
await send_to_user(user_id, token)优化方案 4:使用 LangGraph API(推荐)
from langgraph_sdk import get_client
class ProductionStreamService:
"""生产级流式服务"""
def __init__(self, api_url="http://localhost:2024"):
self.client = get_client(url=api_url)
self.connection_pool = {}
async def stream_chat(self, user_id, message, thread_id):
"""流式聊天(使用 messages 模式)"""
try:
full_response = ""
async for event in self.client.runs.stream(
thread_id,
assistant_id="chatbot",
input={"messages": [{"role": "user", "content": message}]},
stream_mode="messages" # API 优化的模式
):
if event.event == "messages/partial":
for item in event.data:
if "content" in item:
content = item["content"]
# 计算增量
if len(content) > len(full_response):
new_tokens = content[len(full_response):]
full_response = content
# 发送增量
yield new_tokens
except asyncio.CancelledError:
# 用户取消
await self.cleanup_stream(thread_id)
raise
async def cleanup_stream(self, thread_id):
"""清理流资源"""
# 取消运行中的请求
pass
# FastAPI 集成
from fastapi import FastAPI, WebSocket
app = FastAPI()
service = ProductionStreamService()
@app.websocket("/ws/chat/{user_id}")
async def websocket_chat(websocket: WebSocket, user_id: str):
"""WebSocket 流式聊天"""
await websocket.accept()
try:
while True:
# 接收用户消息
message = await websocket.receive_text()
# 流式响应
async for token in service.stream_chat(
user_id,
message,
thread_id=f"user_{user_id}"
):
await websocket.send_text(token)
# 发送完成标记
await websocket.send_text("[DONE]")
except WebSocketDisconnect:
print(f"User {user_id} disconnected")优化方案 5:缓存和预加载
from functools import lru_cache
class CachedStreamService:
"""带缓存的流式服务"""
def __init__(self):
self.response_cache = {}
@lru_cache(maxsize=1000)
def get_cached_response(self, message_hash):
"""获取缓存的响应"""
return self.response_cache.get(message_hash)
async def stream_with_cache(self, message):
"""带缓存的流式输出"""
message_hash = hash(message)
# 检查缓存
cached = self.get_cached_response(message_hash)
if cached:
# 模拟流式输出缓存的响应
for token in cached.split():
yield token + " "
await asyncio.sleep(0.05) # 模拟延迟
return
# 生成新响应
full_response = ""
async for event in graph.astream_events(...):
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
full_response += token
yield token
# 缓存响应
self.response_cache[message_hash] = full_response性能监控
import time
from dataclasses import dataclass
@dataclass
class StreamMetrics:
"""流式输出性能指标"""
user_id: str
start_time: float
end_time: float = None
token_count: int = 0
total_bytes: int = 0
@property
def duration(self):
if self.end_time:
return self.end_time - self.start_time
return time.time() - self.start_time
@property
def tokens_per_second(self):
duration = self.duration
return self.token_count / duration if duration > 0 else 0
class MonitoredStreamService:
"""带监控的流式服务"""
def __init__(self):
self.metrics = []
async def monitored_stream(self, user_id, graph, input_data, config):
"""带性能监控的流式输出"""
metric = StreamMetrics(user_id=user_id, start_time=time.time())
try:
async for event in graph.astream_events(input_data, config, version="v2"):
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
metric.token_count += 1
metric.total_bytes += len(token.encode('utf-8'))
yield token
finally:
metric.end_time = time.time()
self.metrics.append(metric)
# 记录性能
print(f"\n📊 性能指标:")
print(f" 用户: {metric.user_id}")
print(f" 耗时: {metric.duration:.2f}s")
print(f" Token 数: {metric.token_count}")
print(f" 速度: {metric.tokens_per_second:.1f} tokens/s")问题 10: 设计完整的 Human-in-the-Loop 系统架构
展开查看完整解析
系统架构图
┌──────────────────────────────────────────────────────────────┐
│ 客户端层(Client Layer) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Web UI │ │ Mobile │ │ API │ │ CLI Tool │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└───────┼─────────────┼─────────────┼─────────────┼────────────┘
│ │ │ │
└─────────────┴─────────────┴─────────────┘
│
┌───────────────▼────────────────┐
│ API 网关(Rate Limiting) │
└───────────────┬────────────────┘
│
┌───────────────────────▼────────────────────────────────────────┐
│ 服务层(Service Layer) │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Stream Manager(流管理) │ │
│ │ - 并发控制(Semaphore) │ │
│ │ - 连接池管理 │ │
│ │ - 缓冲优化 │ │
│ └────────────────────────────────────────────────────────┘ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Approval Manager(审批管理) │ │
│ │ - 静态断点 (interrupt_before) │ │
│ │ - 动态断点 (NodeInterrupt) │ │
│ │ - 审批队列 │ │
│ └────────────────────────────────────────────────────────┘ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ State Manager(状态管理) │ │
│ │ - update_state │ │
│ │ - get_state_history │ │
│ │ - 分叉管理 │ │
│ └────────────────────────────────────────────────────────┘ │
└──────────────────────┬─────────────────────────────────────────┘
│
┌──────────────────────▼─────────────────────────────────────────┐
│ LangGraph 层(Graph Layer) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Assistant │ │ Tools │ │ Router │ │ Feedback │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└──────────────────────┬─────────────────────────────────────────┘
│
┌──────────────────────▼─────────────────────────────────────────┐
│ 持久化层(Persistence Layer) │
│ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │
│ │ Checkpointer │ │ Audit Logger │ │ Metrics DB │ │
│ │ (PostgreSQL) │ │ (Elasticsearch)│ │ (Prometheus) │ │
│ └────────────────┘ └────────────────┘ └────────────────┘ │
└────────────────────────────────────────────────────────────────┘完整实现
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
import asyncio
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.errors import NodeInterrupt
@dataclass
class ApprovalRequest:
"""审批请求"""
request_id: str
thread_id: str
tool_name: str
tool_args: dict
risk_level: str # "low", "medium", "high"
created_at: datetime
status: str # "pending", "approved", "rejected"
class HumanInTheLoopSystem:
"""完整的 Human-in-the-Loop 系统"""
def __init__(
self,
postgres_connection: str,
max_concurrent_streams: int = 100,
buffer_size: int = 5
):
# 组件初始化
self.checkpointer = PostgresSaver(connection_string=postgres_connection)
self.stream_semaphore = asyncio.Semaphore(max_concurrent_streams)
self.buffer_size = buffer_size
# 审批队列
self.approval_queue: Dict[str, ApprovalRequest] = {}
# 性能指标
self.metrics = {
"total_requests": 0,
"approved": 0,
"rejected": 0,
"auto_approved": 0
}
# 构建图
self.graph = self._build_graph()
def _build_graph(self):
"""构建 LangGraph"""
builder = StateGraph(MessagesState)
# 添加节点
builder.add_node("assistant", self._assistant_node)
builder.add_node("risk_assessment", self._risk_assessment)
builder.add_node("approval_gate", self._approval_gate)
builder.add_node("tools", self._tools_node)
# 添加边
builder.add_edge(START, "assistant")
builder.add_edge("assistant", "risk_assessment")
builder.add_conditional_edges(
"risk_assessment",
self._route_after_risk,
{
"auto_approve": "tools",
"need_approval": "approval_gate"
}
)
builder.add_edge("approval_gate", "tools")
builder.add_edge("tools", "assistant")
# 编译
return builder.compile(
interrupt_before=["approval_gate"],
checkpointer=self.checkpointer
)
def _assistant_node(self, state):
"""助手节点"""
# LLM 调用
pass
def _risk_assessment(self, state):
"""风险评估节点"""
last_message = state["messages"][-1]
if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
tool_call = last_message.tool_calls[0]
# 评估风险
risk_level = self._calculate_risk(tool_call)
return {
"risk_level": risk_level,
"pending_tool": tool_call
}
return {"risk_level": "none"}
def _calculate_risk(self, tool_call) -> str:
"""计算风险等级"""
high_risk_tools = ["delete", "payment", "refund"]
medium_risk_tools = ["update", "modify"]
if tool_call['name'] in high_risk_tools:
return "high"
elif tool_call['name'] in medium_risk_tools:
return "medium"
else:
return "low"
def _route_after_risk(self, state):
"""风险评估后路由"""
risk = state.get("risk_level", "low")
if risk in ["high", "medium"]:
return "need_approval"
else:
self.metrics["auto_approved"] += 1
return "auto_approve"
def _approval_gate(self, state):
"""审批门(空节点,用于断点)"""
pass
def _tools_node(self, state):
"""工具执行节点"""
# 执行工具
pass
async def stream_with_hitl(
self,
user_id: str,
message: str,
thread_id: str
):
"""带 HITL 的流式执行"""
async with self.stream_semaphore: # 并发控制
self.metrics["total_requests"] += 1
# 配置
config = {"configurable": {"thread_id": thread_id}}
input_data = {"messages": [{"role": "user", "content": message}]}
# 缓冲器
buffer = []
try:
async for event in self.graph.astream_events(
input_data,
config,
version="v2"
):
# Token 流
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
buffer.append(token)
# 批量输出
if len(buffer) >= self.buffer_size:
yield "".join(buffer)
buffer.clear()
# 刷新剩余
if buffer:
yield "".join(buffer)
# 检查是否需要审批
state = self.graph.get_state(config)
if state.tasks and state.tasks[0].interrupts:
# 创建审批请求
request = self._create_approval_request(
thread_id,
state
)
# 等待审批
yield f"\n\n⏸️ 等待审批(请求 ID: {request.request_id})"
except Exception as e:
yield f"\n\n❌ 错误: {e}"
def _create_approval_request(
self,
thread_id: str,
state
) -> ApprovalRequest:
"""创建审批请求"""
import uuid
pending_tool = state.values.get('pending_tool')
risk_level = state.values.get('risk_level', 'medium')
request = ApprovalRequest(
request_id=str(uuid.uuid4()),
thread_id=thread_id,
tool_name=pending_tool['name'],
tool_args=pending_tool['args'],
risk_level=risk_level,
created_at=datetime.now(),
status="pending"
)
self.approval_queue[request.request_id] = request
return request
async def approve_request(
self,
request_id: str,
approved: bool,
modified_args: Optional[dict] = None
):
"""审批请求"""
request = self.approval_queue.get(request_id)
if not request:
raise ValueError(f"Request {request_id} not found")
if approved:
request.status = "approved"
self.metrics["approved"] += 1
# 更新状态
config = {"configurable": {"thread_id": request.thread_id}}
if modified_args:
self.graph.update_state(
config,
{"modified_params": modified_args},
as_node="approval_gate"
)
# 继续执行
result = []
async for event in self.graph.astream_events(None, config, version="v2"):
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
result.append(token)
return "".join(result)
else:
request.status = "rejected"
self.metrics["rejected"] += 1
return "操作已拒绝"
def get_metrics(self) -> dict:
"""获取系统指标"""
return {
**self.metrics,
"approval_rate": self.metrics["approved"] / max(self.metrics["total_requests"], 1),
"pending_approvals": len([r for r in self.approval_queue.values() if r.status == "pending"])
}
# FastAPI 集成示例
from fastapi import FastAPI, WebSocket, HTTPException
app = FastAPI()
hitl_system = HumanInTheLoopSystem(
postgres_connection="postgresql://user:pass@localhost/db",
max_concurrent_streams=100
)
@app.websocket("/ws/chat/{user_id}")
async def websocket_chat(websocket: WebSocket, user_id: str):
"""WebSocket 聊天端点"""
await websocket.accept()
try:
while True:
message = await websocket.receive_text()
async for token in hitl_system.stream_with_hitl(
user_id,
message,
thread_id=f"user_{user_id}"
):
await websocket.send_text(token)
except WebSocketDisconnect:
pass
@app.post("/approve/{request_id}")
async def approve_request(
request_id: str,
approved: bool,
modified_args: Optional[dict] = None
):
"""审批端点"""
try:
result = await hitl_system.approve_request(
request_id,
approved,
modified_args
)
return {"status": "success", "result": result}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@app.get("/metrics")
async def get_metrics():
"""获取系统指标"""
return hitl_system.get_metrics()🎉 复习完成!
知识掌握度自测
完成以上 10 道问题后,请评估你的掌握程度:
- [ ] Breakpoints:能独立实现工具调用审批流程
- [ ] Dynamic Breakpoints:能根据业务逻辑条件性中断
- [ ] Edit State:理解
as_node和 Message ID 机制 - [ ] Streaming:能实现生产级的 Token 流式输出
- [ ] Time Travel:掌握历史浏览、回放和分叉技术
下一步学习路径
根据你的掌握情况,选择下一步方向:
巩固基础(掌握度 < 70%):
- 重新学习未掌握的章节
- 完成每节课的实战练习
- 构建小型示例项目验证理解
进阶学习(掌握度 70%-90%):
- 学习 Module-5:高级图模式(并行、子图、动态路由)
- 学习 Module-6:生产部署与优化
- 构建中型综合项目
专家路径(掌握度 > 90%):
- 学习 Module-7:大型实战项目
- 研究 LangGraph 源码
- 贡献开源项目或撰写技术博客
实践建议
项目 1:智能客服系统(中级)
- 需求:支持订单查询、退款审批、投诉处理
- 技术:Breakpoints + Edit State + Streaming
- 预计时间:3-5 天
项目 2:内容审核平台(高级)
- 需求:多维度风险评估、条件性审批、审批日志
- 技术:Dynamic Breakpoints + 完整审计系统
- 预计时间:5-7 天
项目 3:AI 决策调试工具(专家级)
- 需求:历史浏览、回放、A/B 测试、性能监控
- 技术:Time Travel + 所有 HITL 技术
- 预计时间:7-10 天
参考资源
官方文档:
社区资源:
进阶阅读:
- Python Async/Await 深入教程
- WebSocket 实时通信
- PostgreSQL 性能优化
恭喜你完成 Module-4 的复习! 🎊
你现在已经掌握了构建可控、可信、可靠的 Human-in-the-Loop AI 系统的全部技能。这些技术不仅是工具,更是一套完整的"可信 AI"设计哲学。在未来的 AI 应用开发中,Human-in-the-Loop 将成为区分优秀系统和卓越系统的关键标志。
继续加油,向着精通 LangGraph 的目标前进! 🚀