Skip to content

Module-4 小结和复习:Human-in-the-Loop 精通指南

来自图灵奖获得者的总结寄语

"当你完成本章学习,你已经掌握了构建可控 AI 系统的核心技术。记住:最强大的 AI 不是那些能独立完成所有任务的系统,而是那些知道何时需要人类介入、如何优雅地等待、并能从人类反馈中学习的系统。你现在拥有的不仅是技术工具,更是一套完整的'可信 AI'设计哲学。在未来的 AI 应用中,Human-in-the-Loop 将成为区分优秀系统和卓越系统的关键标志。"

启发自 Alan Turing 对人机协作的前瞻性思考


📚 核心术语表(Module-4 全模块)

术语名称LangGraph 定义和解读Python 定义和说明重要程度
Human-in-the-Loop人机协同设计模式,在 AI 执行关键操作前引入人类判断和反馈N/A (架构设计模式)⭐⭐⭐⭐⭐
Breakpoint图执行过程中的主动暂停点,支持在节点前/后中断等待人工干预interrupt_before/after 参数实现⭐⭐⭐⭐⭐
interrupt_before编译时设置的静态断点,在指定节点执行前暂停graph.compile(interrupt_before=["node"])⭐⭐⭐⭐⭐
NodeInterrupt动态断点核心类,在节点内抛出此异常触发条件性中断raise NodeInterrupt("原因")⭐⭐⭐⭐⭐
Checkpointer状态持久化管理器,自动保存执行历史,支持断点恢复和时间旅行MemorySaver()PostgresSaver()⭐⭐⭐⭐⭐
get_state()获取图当前状态的方法,返回 StateSnapshot 对象state = graph.get_state(config)⭐⭐⭐⭐⭐
stream(None)从断点恢复执行的关键用法,None 表示使用当前状态继续graph.stream(None, config)⭐⭐⭐⭐⭐
update_state()修改图状态的方法,支持指定更新来源节点(as_node参数)graph.update_state(config, values, as_node=...)⭐⭐⭐⭐⭐
as_node 参数指定状态更新来自哪个节点,决定后续执行流程update_state 的关键字参数⭐⭐⭐⭐⭐
Message ID消息唯一标识符,用于消息去重和覆盖,状态编辑核心机制BaseMessage 的 id 属性⭐⭐⭐⭐⭐
add_messagesMessagesState 的 reducer,根据 ID 决定追加或覆盖消息Annotated[list, add_messages]⭐⭐⭐⭐⭐
Streaming图执行过程中实时输出结果的技术,支持多种粒度的流式输出生成器或异步生成器实现⭐⭐⭐⭐⭐
stream_mode控制流式输出粒度的参数(updates/values/messages)graph.stream(..., stream_mode="values")⭐⭐⭐⭐⭐
astream_events异步流式事件 API,支持 Token 级流式输出async for event in graph.astream_events(..., version="v2")⭐⭐⭐⭐⭐
Time Travel浏览、回放和分叉历史状态的能力,用于调试和测试基于 checkpointer 的历史管理⭐⭐⭐⭐⭐
get_state_history()获取完整执行历史的方法,返回所有 StateSnapshotlist(graph.get_state_history(config))⭐⭐⭐⭐⭐
回放(Replaying)从历史 checkpoint 重新执行图,不修改状态graph.stream(None, checkpoint_config)⭐⭐⭐⭐
分叉(Forking)从历史状态创建新执行分支,修改状态后探索替代路径update_state() + stream(None) 组合⭐⭐⭐⭐⭐
Thread线程配置,用于标识和隔离不同会话,通过 thread_id 区分{"configurable": {"thread_id": "1"}}⭐⭐⭐⭐⭐
StateSnapshot状态快照对象,包含 values、next、config、metadata 等属性LangGraph 返回的数据结构⭐⭐⭐⭐

📋 本章核心知识回顾

学习地图

mermaid
graph TB
    A[Human-in-the-Loop 体系] --> B[Breakpoints]
    A --> C[Dynamic Breakpoints]
    A --> D[Edit State]
    A --> E[Streaming]
    A --> F[Time Travel]
    
    B --> B1[interrupt_before/after]
    B --> B2[Checkpointer]
    B --> B3[stream继续执行]
    
    C --> C1[NodeInterrupt]
    C --> C2[条件性中断]
    C --> C3[信息传递]
    
    D --> D1[update_state]
    D --> D2[as_node参数]
    D --> D3[Message ID覆盖]
    
    E --> E1[stream_mode选择]
    E --> E2[Token Streaming]
    E --> E3[async/await]
    
    F --> F1[get_state_history]
    F --> F2[回放Replaying]
    F --> F3[分叉Forking]
    
    style A fill:#e1f5ff
    style B fill:#fff4e1
    style C fill:#ffe1f5
    style D fill:#f5e1ff
    style E fill:#e1ffe1
    style F fill:#ffe1e1

五大核心技术速查表

技术核心API主要用途难度
Breakpointsinterrupt_before/after敏感操作审批⭐⭐⭐
Dynamic BreakpointsNodeInterrupt条件性智能中断⭐⭐⭐⭐
Edit Stateupdate_state(as_node=...)注入人类反馈⭐⭐⭐⭐
Streamingastream_events实时进度显示⭐⭐⭐⭐⭐
Time Travelget_state_history + 分叉历史回溯调试⭐⭐⭐⭐⭐

🎯 复习题目列表

本章精心设计了 10 道综合性问题,涵盖所有核心知识点。建议按顺序完成,每道题预计耗时 15-30 分钟。

基础理解(问题 1-3)

  1. 静态断点与动态断点的本质区别是什么?各自适用于什么场景?
  2. update_stateas_node 参数有什么作用?不使用会怎样?
  3. Streaming 的四种模式(updates/values/astream_events/messages)有什么区别?

实战应用(问题 4-7)

  1. 如何实现一个支持条件性审批的智能内容审核系统?
  2. 如何实现类 ChatGPT 的 Token 级流式输出?
  3. 如何利用 Time Travel 实现 AI 决策的"撤销"功能?
  4. 设计一个客服 Agent,需要在敏感操作前暂停并修改状态

高级综合(问题 8-10)

  1. Message ID 在状态编辑和分叉中的作用机制是什么?
  2. 如何优化 Streaming 性能以支持大规模并发?
  3. 设计一个完整的 Human-in-the-Loop 系统架构

📚 详细问答解析

问题 1: 静态断点与动态断点的本质区别

展开查看完整解析

核心区别

静态断点(Static Breakpoints):

python
# 编译时固定设置
graph = builder.compile(
    interrupt_before=["tools"],  # 总是在 tools 节点前暂停
    checkpointer=memory
)

动态断点(Dynamic Breakpoints):

python
# 运行时条件触发
from langgraph.errors import NodeInterrupt

def content_check(state):
    risk_score = calculate_risk(state['content'])
    
    if risk_score > 0.7:  # 只在高风险时中断
        raise NodeInterrupt(f"风险评分: {risk_score},需要审核")
    
    return state

对比表

维度静态断点动态断点
设置时机编译时(compile()运行时(节点内部)
触发方式固定节点,必然触发条件判断,选择性触发
信息传递无法传递中断原因可传递详细信息
灵活性低(每次都中断)高(智能判断)
实现方式interrupt_before/afterraise NodeInterrupt
适用场景固定审批点智能审核

适用场景

静态断点适用场景:

  1. 固定审批流程
python
# 所有支付操作都需要人工确认
graph = builder.compile(
    interrupt_before=["payment_tool"],
    checkpointer=memory
)
  1. 调试模式
python
# 开发时在每个节点后暂停检查
if DEBUG_MODE:
    graph = builder.compile(
        interrupt_after=["assistant", "tools", "router"],
        checkpointer=memory
    )
  1. 用户确认
python
# 关键操作前总是确认
interrupt_before=["delete_data", "send_email", "deploy"]

动态断点适用场景:

  1. 内容审核
python
def content_moderation(state):
    result = ai_moderation_api(state['content'])
    
    # 高风险:立即拒绝
    if result.score > 0.9:
        raise NodeInterrupt(f"严重违规,已拒绝。问题: {result.issues}")
    
    # 中风险:人工审核
    if result.score > 0.5:
        raise NodeInterrupt(f"需要审核。可疑点: {result.warnings}")
    
    # 低风险:自动通过
    return state
  1. 资源限制
python
def resource_check(state):
    if len(state['input']) > 10000:
        raise NodeInterrupt("输入过长,需要确认是否处理")
    
    if state['estimated_cost'] > 100:
        raise NodeInterrupt(f"预估成本 ${state['estimated_cost']},需要批准")
    
    return state
  1. 异常检测
python
def anomaly_detection(state):
    if detect_unusual_pattern(state):
        raise NodeInterrupt(
            f"检测到异常模式: {get_anomaly_description(state)}"
        )
    
    return state

组合使用示例

python
# 静态断点:所有工具调用前暂停
graph = builder.compile(
    interrupt_before=["tools"],
    checkpointer=memory
)

# 动态断点:在工具节点内部根据工具类型判断
def tools_with_dynamic_check(state):
    tool_call = state["messages"][-1].tool_calls[0]
    
    # 高风险工具需要额外审批
    if tool_call['name'] in ['delete', 'payment']:
        raise NodeInterrupt(
            f"高风险工具: {tool_call['name']},需要二次确认"
        )
    
    # 执行工具
    return execute_tool(tool_call)

最佳实践

选择标准:

  • 固定流程 → 静态断点
  • 智能判断 → 动态断点
  • 多级审批 → 组合使用

性能考虑:

  • 静态断点开销小(编译时设置)
  • 动态断点需要运行时判断(但更灵活)

问题 2: update_stateas_node 参数作用

展开查看完整解析

核心作用

as_node 参数告诉 LangGraph:这个状态更新来自哪个节点,从而决定下一步执行哪个节点。

有无 as_node 的区别

不使用 as_node(错误):

python
# ❌ 状态更新了,但图不知道从哪继续
graph.update_state(thread, {"messages": [HumanMessage("新消息")]})

# 调用 stream(None, thread) 会怎样?
# → 图不知道下一个节点是什么,可能出错或重复执行

使用 as_node(正确):

python
# ✅ 模拟 human_feedback 节点的输出
graph.update_state(
    thread,
    {"messages": [HumanMessage("新消息")]},
    as_node="human_feedback"  # 告诉图:这是 human_feedback 节点的输出
)

# 调用 stream(None, thread)
# → 图认为 human_feedback 已完成,执行其后续节点

工作原理

图结构示例:

START → [human_feedback] → [assistant] → [tools] → END

使用 as_node 的执行流:

python
# 1. 图暂停在 human_feedback 前
for event in graph.stream(input, thread):
    print(event)

# 2. 作为 human_feedback 节点更新状态
graph.update_state(
    thread,
    {"messages": user_feedback},
    as_node="human_feedback"
)

# 3. 继续执行(从 assistant 开始)
for event in graph.stream(None, thread):
    print(event)  # human_feedback → assistant → tools → END

不使用 as_node 的混乱:

python
# 1. 暂停在 human_feedback 前
# 2. 只更新状态,不指定节点
graph.update_state(thread, {"messages": user_feedback})

# 3. 继续执行时
# → 图可能从头开始
# → 或重复执行 human_feedback
# → 或出现不可预测的行为

实际应用示例

场景 1:人工反馈节点

python
class State(TypedDict):
    messages: list
    user_input: str

def human_feedback(state):
    """空节点,仅作为暂停点"""
    pass

def assistant(state):
    return {"messages": [llm.invoke(state["messages"])]}

# 构建图
builder = StateGraph(State)
builder.add_node("human_feedback", human_feedback)
builder.add_node("assistant", assistant)
builder.add_edge(START, "human_feedback")
builder.add_edge("human_feedback", "assistant")

graph = builder.compile(
    interrupt_before=["human_feedback"],
    checkpointer=memory
)

# 执行流程
thread = {"configurable": {"thread_id": "1"}}

# 暂停在 human_feedback 前
for event in graph.stream({"messages": []}, thread):
    print(event)

# 用户输入
user_input = input("你的问题: ")

# 作为 human_feedback 节点更新
graph.update_state(
    thread,
    {"messages": [HumanMessage(user_input)]},
    as_node="human_feedback"  # ⭐ 关键
)

# 继续执行(跳过 human_feedback,执行 assistant)
for event in graph.stream(None, thread):
    print(event)

场景 2:错误修正节点

python
def error_handler(state):
    """检查错误并修正"""
    if state.get("error"):
        # 需要人工修正
        raise NodeInterrupt("发现错误,需要修正")
    return state

# 错误发生
for event in graph.stream(input, thread):
    print(event)

# 人工修正错误
corrected_data = fix_error(state.values)

graph.update_state(
    thread,
    corrected_data,
    as_node="error_handler"  # 模拟错误已修正
)

# 继续执行
for event in graph.stream(None, thread):
    print(event)

进阶用法:控制执行流

跳过某些节点:

python
# 图结构: A → B → C → D
# 想跳过 B 和 C,直接从 D 开始

graph.update_state(
    thread,
    new_state,
    as_node="C"  # 假装 C 已执行
)

# 继续执行 → 直接执行 D

重新执行某个节点:

python
# 获取历史状态
history = list(graph.get_state_history(thread))
before_node_b = history[-3]

# 修改状态,但指定为 A 的输出
graph.update_state(
    before_node_b.config,
    modified_state,
    as_node="A"
)

# 继续执行 → 重新执行 B

常见错误

错误 1:忘记使用 as_node

python
# ❌ 错误
graph.update_state(thread, {"messages": [HumanMessage("hi")]})
graph.stream(None, thread)  # 可能出错

# ✅ 正确
graph.update_state(thread, {"messages": [HumanMessage("hi")]}, as_node="human")
graph.stream(None, thread)

错误 2:as_node 指定了不存在的节点

python
# ❌ 错误:节点名拼写错误
graph.update_state(thread, data, as_node="assitant")  # 拼写错误

# ✅ 正确
graph.update_state(thread, data, as_node="assistant")

错误 3:与图结构不匹配

python
# 图结构: A → B → C

# ❌ 错误:C 后面没有节点
graph.update_state(thread, data, as_node="C")
graph.stream(None, thread)  # 无法继续

# ✅ 正确:选择合适的节点
graph.update_state(thread, data, as_node="B")
graph.stream(None, thread)  # 继续执行 C

问题 3: Streaming 四种模式的区别

展开查看完整解析

四种模式对比

模式粒度输出内容数据量典型用途
updates节点级状态增量更新追踪状态变化
values节点级完整状态快照调试、状态检查
astream_events事件级所有执行事件最大Token streaming
messages (API)Token 级消息流聊天应用

模式 1: updates

特点: 只返回节点对状态的增量更新

python
for chunk in graph.stream(input, thread, stream_mode="updates"):
    print(chunk)

输出示例:

python
{
    'assistant': {
        'messages': [AIMessage(content='Hello!', id='run-123')]
    }
}

{
    'tools': {
        'messages': [ToolMessage(content='6', name='multiply')]
    }
}

输出结构:

python
{
    '节点名': {
        '更新的字段': 新值
    }
}

适用场景:

  • 只关心状态变化,不需要完整状态
  • 减少网络传输数据量
  • 追踪每个节点的贡献

代码示例:

python
for chunk in graph.stream(input, thread, stream_mode="updates"):
    for node_name, update in chunk.items():
        print(f"[{node_name}] 更新了:")
        for field, value in update.items():
            print(f"  {field}: {value}")

模式 2: values

特点: 返回每个节点执行后的完整状态

python
for event in graph.stream(input, thread, stream_mode="values"):
    print(event)

输出示例:

python
# 第一次输出(初始状态)
{
    'messages': [HumanMessage(content='Multiply 2 and 3')]
}

# 第二次输出(assistant 执行后)
{
    'messages': [
        HumanMessage(content='Multiply 2 and 3'),
        AIMessage(content='', tool_calls=[...])
    ]
}

# 第三次输出(tools 执行后)
{
    'messages': [
        HumanMessage(content='Multiply 2 and 3'),
        AIMessage(content='', tool_calls=[...]),
        ToolMessage(content='6', name='multiply')
    ]
}

适用场景:

  • 调试:查看每一步的完整状态
  • 状态检查:验证数据是否正确
  • 理解执行流程

代码示例:

python
for event in graph.stream(input, thread, stream_mode="values"):
    print(f"当前消息数: {len(event['messages'])}")
    for msg in event['messages']:
        msg.pretty_print()
    print("=" * 50)

模式 3: astream_events

特点: 输出图执行过程中的所有事件,支持 Token 级流式输出

python
async for event in graph.astream_events(input, thread, version="v2"):
    print(event)

事件类型:

python
# 节点开始
{"event": "on_chain_start", "name": "assistant"}

# 提示词处理
{"event": "on_prompt_start", "name": "ChatPromptTemplate"}

# LLM 开始
{"event": "on_chat_model_start", "name": "ChatOpenAI"}

# Token 流(⭐ 最重要)
{"event": "on_chat_model_stream", "data": {"chunk": AIMessageChunk(content="The")}}
{"event": "on_chat_model_stream", "data": {"chunk": AIMessageChunk(content=" result")}}
{"event": "on_chat_model_stream", "data": {"chunk": AIMessageChunk(content=" is")}}

# LLM 结束
{"event": "on_chat_model_end", "data": {"output": AIMessage(...)}}

Token Streaming 实现:

python
async for event in graph.astream_events(input, thread, version="v2"):
    # 只处理 Token 流事件
    if event["event"] == "on_chat_model_stream":
        token = event["data"]["chunk"].content
        if token:
            print(token, end="", flush=True)

过滤特定节点的 Token:

python
target_node = "assistant"

async for event in graph.astream_events(input, thread, version="v2"):
    if (event["event"] == "on_chat_model_stream" and
        event["metadata"].get("langgraph_node") == target_node):
        token = event["data"]["chunk"].content
        print(token, end="", flush=True)

适用场景:

  • 实现类 ChatGPT 的打字机效果
  • 实时显示 AI 思考过程
  • 细粒度的执行监控

模式 4: messages (API 专属)

特点: LangGraph API Server 提供的专门优化模式,自动处理消息差异

python
# 需要使用 LangGraph API
from langgraph_sdk import get_client

client = get_client(url="http://127.0.0.1:2024")

async for event in client.runs.stream(
    thread_id,
    assistant_id="agent",
    input=input_data,
    stream_mode="messages"
):
    print(event.event, event.data)

事件类型:

python
# 元数据
{"event": "metadata", "data": {"run_id": "..."}}

# 完整消息
{"event": "messages/complete", "data": [{"content": "Hello", "role": "user"}]}

# 部分消息(Token 流)
{"event": "messages/partial", "data": [{"content": "The"}]}
{"event": "messages/partial", "data": [{"content": "The result"}]}
{"event": "messages/partial", "data": [{"content": "The result is"}]}

# 消息元数据
{"event": "messages/metadata", "data": {"finish_reason": "stop"}}

适用场景:

  • 生产级聊天应用
  • 与前端框架集成
  • 更高效的网络传输

对比本地 astream_events

特性astream_eventsmessages (API)
可用性本地图仅 API Server
Token 流需要过滤事件自动处理
消息差异手动计算自动计算
网络优化

模式选择指南

python
# 场景 1:追踪状态变化
for chunk in graph.stream(input, thread, stream_mode="updates"):
    log_state_changes(chunk)

# 场景 2:调试和开发
for event in graph.stream(input, thread, stream_mode="values"):
    debug_full_state(event)

# 场景 3:聊天应用(本地)
async for event in graph.astream_events(input, thread, version="v2"):
    if event["event"] == "on_chat_model_stream":
        display_token(event["data"]["chunk"].content)

# 场景 4:聊天应用(API,推荐)
async for event in client.runs.stream(..., stream_mode="messages"):
    if event.event == "messages/partial":
        display_token(event.data[0]["content"])

性能考虑

模式数据量网络开销CPU 开销
updates最小最小最小
values
astream_events最大最大
messages (API)小(服务器端)

优化建议:

  • 生产环境优先使用 updatesmessages(API)
  • 调试时使用 values
  • Token streaming 使用 messages(API)或优化后的 astream_events

问题 4: 实现条件性审批的智能内容审核系统

展开查看完整解析

需求分析

构建一个内容审核系统,根据风险评分智能决定是否需要人工审核:

  • 高风险(> 0.9):自动拒绝
  • 中风险(0.5-0.9):人工审核
  • 低风险(< 0.5):自动通过

完整实现

python
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
from typing import TypedDict, Literal
import json

# 定义状态
class ContentState(TypedDict):
    content: str
    risk_score: float
    issues: list[str]
    review_decision: str  # "approve", "reject", "modify"
    status: str

# 风险评估节点
def assess_risk(state: ContentState):
    """使用 AI 模型评估内容风险"""
    content = state['content']
    
    # 调用内容审核 API(示例)
    result = moderation_api.analyze(content)
    
    return {
        "risk_score": result.score,
        "issues": result.detected_issues
    }

# 动态断点节点
def conditional_approval(state: ContentState):
    """根据风险评分决定是否需要人工审核"""
    score = state['risk_score']
    issues = state.get('issues', [])
    
    # 高风险:自动拒绝
    if score > 0.9:
        return {
            "status": "auto_rejected",
            "review_decision": "reject"
        }
    
    # 中风险:触发人工审核
    if score > 0.5:
        # 准备详细信息
        interrupt_info = {
            "risk_score": score,
            "issues": issues,
            "content_preview": state['content'][:200],
            "recommendation": "建议人工审核"
        }
        
        # 抛出 NodeInterrupt
        raise NodeInterrupt(json.dumps(interrupt_info, ensure_ascii=False))
    
    # 低风险:自动通过
    return {
        "status": "auto_approved",
        "review_decision": "approve"
    }

# 执行决策节点
def execute_decision(state: ContentState):
    """根据审核决策执行相应操作"""
    decision = state.get('review_decision')
    
    if decision == "approve":
        return {"status": "approved", "message": "内容已发布"}
    elif decision == "reject":
        return {"status": "rejected", "message": "内容已拒绝"}
    elif decision == "modify":
        # 可以要求用户修改
        return {"status": "needs_modification"}
    else:
        return {"status": "pending"}

# 构建图
builder = StateGraph(ContentState)

builder.add_node("assess_risk", assess_risk)
builder.add_node("conditional_approval", conditional_approval)
builder.add_node("execute_decision", execute_decision)

builder.add_edge(START, "assess_risk")
builder.add_edge("assess_risk", "conditional_approval")
builder.add_edge("conditional_approval", "execute_decision")
builder.add_edge("execute_decision", END)

# 编译(需要 checkpointer 支持动态断点)
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# 执行流程
def moderate_content(content: str, thread_id: str):
    """内容审核完整流程"""
    thread = {"configurable": {"thread_id": thread_id}}
    
    # 1. 提交内容审核
    input_data = {"content": content}
    
    for event in graph.stream(input_data, thread, stream_mode="values"):
        print(f"状态: {event.get('status', 'processing')}")
    
    # 2. 检查是否需要人工审核
    state = graph.get_state(thread)
    
    if state.tasks and state.tasks[0].interrupts:
        # 解析中断信息
        interrupt_info = json.loads(state.tasks[0].interrupts[0].value)
        
        print("\n⚠️  需要人工审核:")
        print(f"  风险评分: {interrupt_info['risk_score']}")
        print(f"  检测到的问题: {', '.join(interrupt_info['issues'])}")
        print(f"  内容预览: {interrupt_info['content_preview']}")
        
        # 3. 获取人工决策
        print("\n请选择操作:")
        print("  1. 批准 (approve)")
        print("  2. 拒绝 (reject)")
        print("  3. 要求修改 (modify)")
        
        choice = input("选择 (1/2/3): ")
        
        decision_map = {"1": "approve", "2": "reject", "3": "modify"}
        decision = decision_map.get(choice, "reject")
        
        # 4. 更新决策并继续
        graph.update_state(
            thread,
            {"review_decision": decision},
            as_node="conditional_approval"
        )
        
        # 5. 继续执行
        for event in graph.stream(None, thread, stream_mode="values"):
            print(f"最终状态: {event.get('status')}")
            print(f"消息: {event.get('message')}")
    
    else:
        # 自动处理(无需人工审核)
        print(f"✅ 自动处理: {state.values.get('status')}")

# 使用示例
if __name__ == "__main__":
    # 测试 1:低风险内容(自动通过)
    moderate_content("这是一篇关于技术的文章", "thread_1")
    
    # 测试 2:中风险内容(需要人工审核)
    moderate_content("这篇文章包含一些争议性观点", "thread_2")
    
    # 测试 3:高风险内容(自动拒绝)
    moderate_content("包含严重违规内容", "thread_3")

进阶优化

1. 多级风险评分:

python
def advanced_risk_assessment(state):
    """多维度风险评估"""
    content = state['content']
    
    # 多个维度评分
    violence_score = check_violence(content)
    hate_speech_score = check_hate_speech(content)
    spam_score = check_spam(content)
    
    # 综合评分
    综合_score = max(violence_score, hate_speech_score, spam_score)
    
    # 详细问题列表
    issues = []
    if violence_score > 0.5:
        issues.append(f"暴力内容 ({violence_score:.2f})")
    if hate_speech_score > 0.5:
        issues.append(f"仇恨言论 ({hate_speech_score:.2f})")
    if spam_score > 0.5:
        issues.append(f"垃圾信息 ({spam_score:.2f})")
    
    return {
        "risk_score": max_score,
        "issues": issues,
        "violence_score": violence_score,
        "hate_speech_score": hate_speech_score,
        "spam_score": spam_score
    }

2. 审批历史记录:

python
from datetime import datetime

审批_log = []

def log_approval_decision(thread_id, content, score, decision, reviewer):
    """记录审批历史"""
    审批_log.append({
        "timestamp": datetime.now().isoformat(),
        "thread_id": thread_id,
        "content_hash": hash(content),
        "risk_score": score,
        "decision": decision,
        "reviewer": reviewer
    })

# 在人工审核后调用
log_approval_decision(
    thread_id,
    state['content'],
    state['risk_score'],
    decision,
    reviewer_id="admin_001"
)

3. 自动学习阈值:

python
class AdaptiveThreshold:
    """自适应风险阈值"""
    
    def __init__(self, initial_threshold=0.5):
        self.threshold = initial_threshold
        self.approval_history = []
    
    def update(self, score, human_decision):
        """根据人工决策调整阈值"""
        self.approval_history.append((score, human_decision))
        
        # 如果低分内容被拒绝,降低阈值
        if score < self.threshold and human_decision == "reject":
            self.threshold = max(0.3, self.threshold - 0.05)
        
        # 如果高分内容被批准,提高阈值
        if score > self.threshold and human_decision == "approve":
            self.threshold = min(0.8, self.threshold + 0.05)
    
    def should_review(self, score):
        """判断是否需要人工审核"""
        return score > self.threshold

# 使用
threshold_manager = AdaptiveThreshold()

def smart_conditional_approval(state):
    score = state['risk_score']
    
    if threshold_manager.should_review(score):
        raise NodeInterrupt(f"风险评分 {score} 超过阈值 {threshold_manager.threshold}")
    
    return {"status": "auto_approved"}

问题 5: 实现类 ChatGPT 的 Token 级流式输出

展开查看完整解析

方案对比

方案环境复杂度推荐度
astream_events本地⭐⭐⭐⭐
messages (API)LangGraph Server⭐⭐⭐⭐⭐

方案 1: 本地 astream_events

python
import asyncio
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver

# 定义模型
llm = ChatOpenAI(model="gpt-4", streaming=True)  # 启用 streaming

# 定义节点
def chatbot(state: MessagesState):
    return {"messages": [llm.invoke(state["messages"])]}

# 构建图
builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)

graph = builder.compile(checkpointer=MemorySaver())

# Token Streaming 实现
async def stream_chat_response(user_message: str, thread_id: str):
    """流式输出聊天响应"""
    from langchain_core.messages import HumanMessage
    
    config = {"configurable": {"thread_id": thread_id}}
    input_data = {"messages": [HumanMessage(content=user_message)]}
    
    print(f"用户: {user_message}")
    print("AI: ", end="", flush=True)
    
    full_response = ""
    
    async for event in graph.astream_events(input_data, config, version="v2"):
        # 过滤 Token 流事件
        if (event["event"] == "on_chat_model_stream" and
            event["metadata"].get("langgraph_node") == "chatbot"):
            
            token = event["data"]["chunk"].content
            
            if token:
                print(token, end="", flush=True)
                full_response += token
    
    print("\n")  # 换行
    return full_response

# 使用
if __name__ == "__main__":
    asyncio.run(stream_chat_response(
        "什么是 LangGraph?",
        "user_123"
    ))

输出效果:

用户: 什么是 LangGraph?
AI: LangGraph 是一个用于构建有状态的、多步骤的 AI 应用程序的框架...

方案 2: LangGraph API messages 模式(推荐)

启动 API Server:

bash
cd your_project
langgraph dev

客户端实现:

python
from langgraph_sdk import get_client
from langchain_core.messages import HumanMessage

async def stream_chat_api(user_message: str):
    """使用 API 的 messages 模式流式输出"""
    
    # 连接到 LangGraph Server
    client = get_client(url="http://127.0.0.1:2024")
    
    # 创建线程
    thread = await client.threads.create()
    
    print(f"用户: {user_message}")
    print("AI: ", end="", flush=True)
    
    full_response = ""
    
    async for event in client.runs.stream(
        thread["thread_id"],
        assistant_id="chatbot",
        input={"messages": [HumanMessage(content=user_message)]},
        stream_mode="messages"
    ):
        # 处理部分消息(Token 流)
        if event.event == "messages/partial":
            for item in event.data:
                if "content" in item and item.get("type") != "human":
                    # 提取新增的 Token
                    content = item["content"]
                    
                    # 计算增量(与之前的响应对比)
                    if len(content) > len(full_response):
                        new_token = content[len(full_response):]
                        print(new_token, end="", flush=True)
                        full_response = content
    
    print("\n")
    return full_response

# 使用
asyncio.run(stream_chat_api("解释什么是 Token Streaming"))

优化技巧

1. 批量输出(减少 I/O):

python
async def batched_stream(user_message: str, batch_size=3):
    """批量输出 Token"""
    buffer = []
    
    async for event in graph.astream_events(...):
        if event["event"] == "on_chat_model_stream":
            token = event["data"]["chunk"].content
            
            if token:
                buffer.append(token)
                
                # 缓冲区满时输出
                if len(buffer) >= batch_size:
                    print("".join(buffer), end="", flush=True)
                    buffer.clear()
    
    # 输出剩余 Token
    if buffer:
        print("".join(buffer), end="", flush=True)

2. 添加打字机效果:

python
import time

async def typewriter_effect(user_message: str, delay=0.05):
    """打字机效果"""
    async for event in graph.astream_events(...):
        if event["event"] == "on_chat_model_stream":
            token = event["data"]["chunk"].content
            
            if token:
                print(token, end="", flush=True)
                await asyncio.sleep(delay)  # 延迟

3. 实时显示思考过程:

python
async def stream_with_progress(user_message: str):
    """显示 AI 思考过程"""
    current_node = None
    
    async for event in graph.astream_events(...):
        # 节点开始
        if event["event"] == "on_chain_start":
            node = event["metadata"].get("langgraph_node")
            if node and node != current_node:
                current_node = node
                print(f"\n[{node}] ", end="", flush=True)
        
        # Token 输出
        elif event["event"] == "on_chat_model_stream":
            token = event["data"]["chunk"].content
            if token:
                print(token, end="", flush=True)

Web 应用集成

FastAPI + SSE (Server-Sent Events):

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(message: str, thread_id: str):
    """SSE 流式聊天端点"""
    
    async def event_generator():
        config = {"configurable": {"thread_id": thread_id}}
        input_data = {"messages": [HumanMessage(content=message)]}
        
        async for event in graph.astream_events(input_data, config, version="v2"):
            if event["event"] == "on_chat_model_stream":
                token = event["data"]["chunk"].content
                
                if token:
                    # SSE 格式
                    yield {
                        "event": "token",
                        "data": token
                    }
        
        # 发送完成信号
        yield {
            "event": "done",
            "data": "[DONE]"
        }
    
    return EventSourceResponse(event_generator())

# 前端使用 EventSource 接收

前端 JavaScript 示例:

javascript
const eventSource = new EventSource(`/chat/stream?message=hello&thread_id=123`);

let responseDiv = document.getElementById('response');

eventSource.addEventListener('token', (e) => {
    responseDiv.textContent += e.data;
});

eventSource.addEventListener('done', (e) => {
    eventSource.close();
    console.log('Stream completed');
});

错误处理

python
async def safe_stream(user_message: str):
    """带错误处理的流式输出"""
    try:
        full_response = ""
        
        async for event in graph.astream_events(...):
            if event["event"] == "on_chat_model_stream":
                token = event["data"]["chunk"].content
                
                if token:
                    print(token, end="", flush=True)
                    full_response += token
        
        return full_response
        
    except KeyboardInterrupt:
        print("\n\n⚠️  用户中断")
        return full_response
        
    except asyncio.TimeoutError:
        print("\n\n⚠️  响应超时")
        return full_response
        
    except Exception as e:
        print(f"\n\n❌ 错误: {e}")
        return full_response

问题 6: 利用 Time Travel 实现"撤销"功能

展开查看完整解析

核心思路

利用 LangGraph 的 checkpointer 自动保存的历史状态,实现"撤销到上一步"的功能。

完整实现

python
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

class UndoableAgent:
    """支持撤销功能的 Agent"""
    
    def __init__(self):
        # 定义 LLM
        self.llm = ChatOpenAI(model="gpt-4")
        
        # 构建图
        builder = StateGraph(MessagesState)
        builder.add_node("assistant", self.assistant_node)
        builder.add_edge(START, "assistant")
        builder.add_edge("assistant", END)
        
        # 使用 checkpointer(必需)
        self.memory = MemorySaver()
        self.graph = builder.compile(checkpointer=self.memory)
    
    def assistant_node(self, state: MessagesState):
        """助手节点"""
        return {"messages": [self.llm.invoke(state["messages"])]}
    
    def execute(self, user_message: str, thread_id: str):
        """执行并返回响应"""
        thread = {"configurable": {"thread_id": thread_id}}
        input_data = {"messages": [HumanMessage(content=user_message)]}
        
        # 执行图
        result = None
        for event in self.graph.stream(input_data, thread, stream_mode="values"):
            result = event
        
        return result['messages'][-1].content
    
    def undo(self, thread_id: str, steps=1):
        """撤销最近的 N 步"""
        thread = {"configurable": {"thread_id": thread_id}}
        
        # 获取历史
        history = list(self.graph.get_state_history(thread))
        
        if len(history) <= steps:
            print("⚠️  无法撤销:没有足够的历史记录")
            return None
        
        # 回到 N 步之前的状态
        target_state = history[steps]
        
        print(f"✅ 已撤销到 {steps} 步之前")
        print(f"   Checkpoint ID: {target_state.config['configurable']['checkpoint_id'][:8]}...")
        
        return target_state
    
    def get_history_summary(self, thread_id: str):
        """显示历史摘要"""
        thread = {"configurable": {"thread_id": thread_id}}
        history = list(self.graph.get_state_history(thread))
        
        print(f"\n📜 历史记录(共 {len(history)} 个状态):")
        for i, state in enumerate(history):
            step_num = len(history) - i - 1
            timestamp = state.created_at.strftime("%H:%M:%S") if state.created_at else "N/A"
            msg_count = len(state.values.get('messages', []))
            
            # 获取最后一条消息的预览
            if msg_count > 0:
                last_msg = state.values['messages'][-1]
                preview = last_msg.content[:50] if hasattr(last_msg, 'content') else str(last_msg)[:50]
            else:
                preview = "(空)"
            
            print(f"  [{step_num}] {timestamp} - {msg_count} 条消息 - {preview}")
        
        return history
    
    def continue_from_checkpoint(self, checkpoint_config):
        """从指定 checkpoint 继续执行"""
        # 从历史状态继续
        result = None
        for event in self.graph.stream(None, checkpoint_config, stream_mode="values"):
            result = event
        
        return result

# 使用示例
def demo_undo():
    """演示撤销功能"""
    agent = UndoableAgent()
    thread_id = "demo_thread"
    
    print("=== 对话开始 ===\n")
    
    # 步骤 1
    print("用户: 你好")
    response1 = agent.execute("你好", thread_id)
    print(f"AI: {response1}\n")
    
    # 步骤 2
    print("用户: 我想了解 LangGraph")
    response2 = agent.execute("我想了解 LangGraph", thread_id)
    print(f"AI: {response2}\n")
    
    # 步骤 3
    print("用户: 它有什么优势?")
    response3 = agent.execute("它有什么优势?", thread_id)
    print(f"AI: {response3}\n")
    
    # 显示历史
    history = agent.get_history_summary(thread_id)
    
    # 撤销最近的 2 步
    print("\n=== 执行撤销 ===")
    target_state = agent.undo(thread_id, steps=2)
    
    if target_state:
        # 查看撤销后的状态
        print("\n撤销后的消息:")
        for msg in target_state.values['messages']:
            print(f"  - {msg.content[:50]}")
        
        # 从撤销点继续新的对话
        print("\n=== 从撤销点继续新对话 ===")
        
        # 修改输入,创建新分支
        new_response = agent.execute("其实我想了解 Python", thread_id)
        print(f"用户: 其实我想了解 Python")
        print(f"AI: {new_response}")

if __name__ == "__main__":
    demo_undo()

高级功能:多级撤销栈

python
class AdvancedUndoAgent(UndoableAgent):
    """支持多级撤销/重做的 Agent"""
    
    def __init__(self):
        super().__init__()
        self.undo_stack = []  # 撤销栈
        self.redo_stack = []  # 重做栈
    
    def execute(self, user_message: str, thread_id: str):
        """执行并记录到撤销栈"""
        # 获取执行前的状态
        thread = {"configurable": {"thread_id": thread_id}}
        before_state = self.graph.get_state(thread)
        
        # 执行操作
        response = super().execute(user_message, thread_id)
        
        # 获取执行后的状态
        after_state = self.graph.get_state(thread)
        
        # 保存到撤销栈
        self.undo_stack.append({
            "before": before_state,
            "after": after_state,
            "action": user_message
        })
        
        # 清空重做栈
        self.redo_stack.clear()
        
        return response
    
    def undo_last(self, thread_id: str):
        """撤销最后一次操作"""
        if not self.undo_stack:
            print("⚠️  无法撤销:撤销栈为空")
            return False
        
        # 弹出最后一次操作
        last_action = self.undo_stack.pop()
        
        # 保存到重做栈
        self.redo_stack.append(last_action)
        
        # 恢复到之前的状态(通过分叉)
        thread = {"configurable": {"thread_id": thread_id}}
        self.graph.update_state(
            thread,
            last_action["before"].values
        )
        
        print(f"✅ 已撤销操作: {last_action['action'][:50]}")
        return True
    
    def redo_last(self, thread_id: str):
        """重做最后一次撤销的操作"""
        if not self.redo_stack:
            print("⚠️  无法重做:重做栈为空")
            return False
        
        # 弹出重做栈
        action = self.redo_stack.pop()
        
        # 恢复到执行后的状态
        thread = {"configurable": {"thread_id": thread_id}}
        self.graph.update_state(
            thread,
            action["after"].values
        )
        
        # 保存回撤销栈
        self.undo_stack.append(action)
        
        print(f"✅ 已重做操作: {action['action'][:50]}")
        return True

用户界面示例

python
def interactive_chat_with_undo():
    """交互式聊天,支持撤销"""
    agent = AdvancedUndoAgent()
    thread_id = "interactive_session"
    
    print("🤖 支持撤销的聊天机器人")
    print("   命令: /undo - 撤销, /redo - 重做, /history - 历史, /quit - 退出\n")
    
    while True:
        user_input = input("你: ")
        
        # 处理命令
        if user_input == "/quit":
            break
        elif user_input == "/undo":
            agent.undo_last(thread_id)
            continue
        elif user_input == "/redo":
            agent.redo_last(thread_id)
            continue
        elif user_input == "/history":
            agent.get_history_summary(thread_id)
            continue
        
        # 正常对话
        try:
            response = agent.execute(user_input, thread_id)
            print(f"AI: {response}\n")
        except Exception as e:
            print(f"❌ 错误: {e}\n")

if __name__ == "__main__":
    interactive_chat_with_undo()

时间线可视化

python
def visualize_timeline(agent, thread_id):
    """可视化对话时间线"""
    thread = {"configurable": {"thread_id": thread_id}}
    history = list(agent.graph.get_state_history(thread))
    
    print("\n📊 对话时间线:")
    print("=" * 60)
    
    for i, state in enumerate(reversed(history)):
        step_num = len(history) - i - 1
        indent = "    " * (step_num % 3)  # 缩进显示层次
        
        # 获取消息数
        msg_count = len(state.values.get('messages', []))
        
        # 显示
        print(f"{indent}[{step_num}] {msg_count} 条消息")
        
        # 显示最后一条消息
        if msg_count > 0:
            last_msg = state.values['messages'][-1]
            if hasattr(last_msg, 'content'):
                preview = last_msg.content[:40]
                print(f"{indent}    └─ {preview}...")
        
        # 分支标记
        if i < len(history) - 1:
            print(f"{indent}    │")
    
    print("=" * 60)

问题 7: 设计敏感操作前暂停并修改状态的客服 Agent

展开查看完整解析

需求分析

构建一个客服 Agent,具备以下能力:

  1. 在执行敏感操作(如退款、删除订单)前暂停
  2. 显示操作详情供人工审核
  3. 允许修改操作参数
  4. 根据审核结果继续或取消

完整实现

python
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, List, Literal
from langchain_openai import ChatOpenAI

# 定义工具
def查询订单(order_id: str) -> dict:
    """查询订单信息"""
    return {"order_id": order_id, "status": "已发货", "amount": 299.00}

def申请退款(order_id: str, amount: float, reason: str) -> dict:
    """申请退款(敏感操作)"""
    return {"order_id": order_id, "refund_amount": amount, "status": "退款成功"}

def删除订单(order_id: str) -> dict:
    """删除订单(敏感操作)"""
    return {"order_id": order_id, "status": "已删除"}

# 工具分类
SENSITIVE_TOOLS = ["申请退款", "删除订单"]

# 定义状态
class CustomerServiceState(TypedDict):
    messages: List
    pending_tool: dict  # 待执行的工具
    approval_status: Literal["pending", "approved", "rejected"]
    modified_params: dict  # 修改后的参数

# 构建 Agent
llm = ChatOpenAI(model="gpt-4")
tools = [查询订单, 申请退款, 删除订单]
llm_with_tools = llm.bind_tools(tools)

def assistant(state: CustomerServiceState):
    """助手节点:生成工具调用"""
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

def check_if_sensitive(state: CustomerServiceState):
    """检查是否需要人工审批"""
    last_message = state["messages"][-1]
    
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        tool_call = last_message.tool_calls[0]
        
        if tool_call['name'] in SENSITIVE_TOOLS:
            # 保存待审批的工具调用
            return {
                "pending_tool": tool_call,
                "approval_status": "pending"
            }
    
    return {"approval_status": "approved"}

def route_after_check(state: CustomerServiceState):
    """根据审批状态路由"""
    if state.get("approval_status") == "pending":
        return "wait_for_approval"  # 需要审批
    else:
        return "execute_tool"  # 直接执行

def wait_for_approval(state: CustomerServiceState):
    """等待审批节点(空节点,用于 breakpoint)"""
    pass

def execute_tool(state: CustomerServiceState):
    """执行工具节点"""
    # 获取工具调用(可能已被修改)
    if state.get("modified_params"):
        # 使用修改后的参数
        tool_call = state["pending_tool"].copy()
        tool_call['args'] = state["modified_params"]
    else:
        # 使用原始参数
        tool_call = state["messages"][-1].tool_calls[0]
    
    # 执行工具
    tool_name = tool_call['name']
    tool_args = tool_call['args']
    
    if tool_name == "查询订单":
        result = 查询订单(**tool_args)
    elif tool_name == "申请退款":
        result = 申请退款(**tool_args)
    elif tool_name == "删除订单":
        result = 删除订单(**tool_args)
    
    # 返回工具结果
    from langchain_core.messages import ToolMessage
    return {"messages": [ToolMessage(
        content=str(result),
        tool_call_id=tool_call['id']
    )]}

# 构建图
builder = StateGraph(CustomerServiceState)

builder.add_node("assistant", assistant)
builder.add_node("check_if_sensitive", check_if_sensitive)
builder.add_node("wait_for_approval", wait_for_approval)
builder.add_node("execute_tool", execute_tool)

builder.add_edge(START, "assistant")
builder.add_edge("assistant", "check_if_sensitive")
builder.add_conditional_edges(
    "check_if_sensitive",
    route_after_check,
    {
        "wait_for_approval": "wait_for_approval",
        "execute_tool": "execute_tool"
    }
)
builder.add_edge("wait_for_approval", "execute_tool")
builder.add_edge("execute_tool", "assistant")

# 编译(在 wait_for_approval 前中断)
memory = MemorySaver()
graph = builder.compile(
    interrupt_before=["wait_for_approval"],
    checkpointer=memory
)

# 完整流程
def customer_service_flow():
    """客服系统完整流程"""
    from langchain_core.messages import HumanMessage
    
    thread = {"configurable": {"thread_id": "customer_001"}}
    
    # 用户请求退款
    print("=== 用户请求 ===")
    user_request = "我要申请订单 #12345 的退款,金额 299 元"
    print(f"用户: {user_request}\n")
    
    input_data = {"messages": [HumanMessage(content=user_request)]}
    
    # 执行到断点
    for event in graph.stream(input_data, thread, stream_mode="values"):
        if 'pending_tool' in event:
            print("⚠️  检测到敏感操作,等待审批...")
            break
    
    # 获取状态,查看待审批的操作
    state = graph.get_state(thread)
    pending_tool = state.values.get('pending_tool')
    
    if pending_tool:
        print("\n=== 待审批操作 ===")
        print(f"工具: {pending_tool['name']}")
        print(f"参数: {pending_tool['args']}")
        
        # 人工审核
        print("\n=== 审核选项 ===")
        print("1. 批准(原参数)")
        print("2. 批准(修改参数)")
        print("3. 拒绝")
        
        choice = input("\n选择 (1/2/3): ")
        
        if choice == "1":
            # 批准,使用原参数
            print("\n✅ 已批准,执行中...")
            for event in graph.stream(None, thread, stream_mode="values"):
                if 'messages' in event:
                    last_msg = event['messages'][-1]
                    if hasattr(last_msg, 'content'):
                        print(f"结果: {last_msg.content}")
        
        elif choice == "2":
            # 修改参数后批准
            print("\n📝 修改参数:")
            new_amount = input(f"  退款金额 (原: {pending_tool['args']['amount']}): ")
            new_reason = input(f"  退款原因 (原: {pending_tool['args']['reason']}): ")
            
            # 更新状态
            modified_params = pending_tool['args'].copy()
            if new_amount:
                modified_params['amount'] = float(new_amount)
            if new_reason:
                modified_params['reason'] = new_reason
            
            graph.update_state(
                thread,
                {"modified_params": modified_params},
                as_node="wait_for_approval"
            )
            
            print("\n✅ 已批准(修改后参数),执行中...")
            for event in graph.stream(None, thread, stream_mode="values"):
                if 'messages' in event:
                    last_msg = event['messages'][-1]
                    if hasattr(last_msg, 'content'):
                        print(f"结果: {last_msg.content}")
        
        else:
            # 拒绝
            print("\n❌ 已拒绝操作")
            # 可以添加拒绝原因到状态

if __name__ == "__main__":
    customer_service_flow()

进阶:审批日志系统

python
from datetime import datetime
import json

class ApprovalLogger:
    """审批日志记录器"""
    
    def __init__(self, log_file="approval_log.jsonl"):
        self.log_file = log_file
    
    def log(self, thread_id, tool_call, decision, modifier=None, reviewer=None):
        """记录审批日志"""
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "thread_id": thread_id,
            "tool": tool_call['name'],
            "original_args": tool_call['args'],
            "modified_args": modifier,
            "decision": decision,  # "approved", "rejected", "modified"
            "reviewer": reviewer or "system"
        }
        
        with open(self.log_file, "a") as f:
            f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
        
        print(f"📝 已记录审批日志: {decision}")
    
    def get_logs(self, thread_id=None):
        """获取审批日志"""
        logs = []
        
        with open(self.log_file, "r") as f:
            for line in f:
                log = json.loads(line)
                if thread_id is None or log["thread_id"] == thread_id:
                    logs.append(log)
        
        return logs

# 在审批流程中使用
logger = ApprovalLogger()

# 记录批准
logger.log(
    thread_id="customer_001",
    tool_call=pending_tool,
    decision="approved",
    reviewer="admin_user"
)

# 记录修改后批准
logger.log(
    thread_id="customer_001",
    tool_call=pending_tool,
    decision="modified",
    modifier=modified_params,
    reviewer="admin_user"
)

问题 8: Message ID 在状态编辑和分叉中的作用机制

展开查看完整解析

核心机制

Message ID 是 LangChain 消息系统的唯一标识符,在 add_messages reducer 中起到关键作用:

Reducer 行为规则:

  • 有 ID,ID 相同 → 覆盖(更新)
  • 有 ID,ID 不同 → 追加
  • 无 ID → 追加(自动生成新 ID)

消息覆盖机制

python
from langchain_core.messages import HumanMessage

# 原始消息
original = HumanMessage(content="Hello", id="msg_001")

# 场景 1:覆盖(保留相同 ID)
updated = HumanMessage(content="Hi there", id="msg_001")  # 相同 ID
graph.update_state(thread, {"messages": [updated]})
# 结果:original 被 updated 覆盖

# 场景 2:追加(使用不同 ID)
new_msg = HumanMessage(content="Another message", id="msg_002")  # 不同 ID
graph.update_state(thread, {"messages": [new_msg]})
# 结果:new_msg 追加到列表末尾

# 场景 3:追加(不提供 ID)
no_id_msg = HumanMessage(content="No ID message")  # 无 ID
graph.update_state(thread, {"messages": [no_id_msg]})
# 结果:自动生成 ID,追加到列表末尾

在分叉中的应用

分叉的核心:覆盖历史消息

python
# 获取历史状态
history = list(graph.get_state_history(thread))
fork_point = history[-2]

# 获取要修改的消息
original_msg = fork_point.values['messages'][0]
print(f"原始消息 ID: {original_msg.id}")
print(f"原始内容: {original_msg.content}")

# 创建分叉:保留 ID,修改内容
forked_msg = HumanMessage(
    content="修改后的内容",
    id=original_msg.id  # ⭐ 关键:保留相同 ID
)

# 更新状态
fork_config = graph.update_state(
    fork_point.config,
    {"messages": [forked_msg]}
)

# 从分叉继续执行
for event in graph.stream(None, fork_config):
    print(event)

如果不保留 ID 会怎样?

python
# ❌ 错误:不保留 ID
forked_msg = HumanMessage(content="修改后的内容")  # 新 ID

graph.update_state(fork_point.config, {"messages": [forked_msg]})

# 结果:
# - 原始消息仍然存在
# - 新消息追加到末尾
# - 消息列表变成: [原始消息, 修改后的内容]
# - 不是真正的"分叉",而是"追加"

实际示例:A/B 测试

python
def ab_test_prompts(graph, base_checkpoint):
    """A/B 测试不同的用户输入"""
    
    # 获取原始消息
    original_msg = base_checkpoint.values['messages'][0]
    original_id = original_msg.id
    
    # 版本 A:原始提示
    print("=== 版本 A(原始) ===")
    for event in graph.stream(None, base_checkpoint.config):
        print(event)
    
    # 版本 B:修改提示
    print("\n=== 版本 B(修改) ===")
    variant_b = HumanMessage(
        content="请用简洁的方式回答:" + original_msg.content,
        id=original_id  # 保留 ID 以覆盖
    )
    
    fork_b = graph.update_state(
        base_checkpoint.config,
        {"messages": [variant_b]}
    )
    
    for event in graph.stream(None, fork_b):
        print(event)
    
    # 版本 C:完全不同的问题
    print("\n=== 版本 C(替代) ===")
    variant_c = HumanMessage(
        content="换个问题:解释什么是 Time Travel",
        id=original_id  # 保留 ID 以覆盖
    )
    
    fork_c = graph.update_state(
        base_checkpoint.config,
        {"messages": [variant_c]}
    )
    
    for event in graph.stream(None, fork_c):
        print(event)

Message ID 生成规则

python
from langchain_core.messages import HumanMessage

# 自动生成 ID(UUID)
msg1 = HumanMessage(content="Hello")
print(msg1.id)  # 输出: run-abc123...

# 手动指定 ID
msg2 = HumanMessage(content="Hello", id="custom_id_001")
print(msg2.id)  # 输出: custom_id_001

# 复制消息时保留 ID
msg3 = msg1.copy()
print(msg3.id == msg1.id)  # True

# 修改内容时可以选择保留或更换 ID
msg4 = HumanMessage(content="New content", id=msg1.id)
print(msg4.id == msg1.id)  # True

高级技巧:批量消息更新

python
def batch_update_messages(graph, thread, updates):
    """批量更新多条消息"""
    
    state = graph.get_state(thread)
    messages = state.values['messages']
    
    # 创建更新映射
    updates_map = {update.id: update for update in updates}
    
    # 应用更新
    updated_messages = []
    for msg in messages:
        if msg.id in updates_map:
            updated_messages.append(updates_map[msg.id])
        else:
            updated_messages.append(msg)
    
    # 更新状态
    graph.update_state(thread, {"messages": updated_messages})

# 使用
updates = [
    HumanMessage(content="修改后的消息 1", id="msg_001"),
    HumanMessage(content="修改后的消息 2", id="msg_003")
]

batch_update_messages(graph, thread, updates)

调试技巧:追踪 Message ID

python
def trace_message_ids(state):
    """追踪消息 ID 变化"""
    messages = state.values.get('messages', [])
    
    print("\n📋 消息 ID 追踪:")
    for i, msg in enumerate(messages):
        msg_type = type(msg).__name__
        content_preview = msg.content[:30] if hasattr(msg, 'content') else str(msg)[:30]
        print(f"  [{i}] {msg_type} | ID: {msg.id[:8]}... | {content_preview}...")

# 在状态更新前后调用
print("更新前:")
trace_message_ids(state_before)

print("\n更新后:")
trace_message_ids(state_after)

问题 9: 优化 Streaming 性能以支持大规模并发

展开查看完整解析

性能瓶颈分析

瓶颈点影响优化方向
事件过滤开销CPU 密集提前编译过滤条件
频繁 I/O 输出网络延迟批量缓冲输出
大量并发连接内存/连接数连接池、限流
Token 逐个传输网络往返批量传输

优化方案 1:事件过滤优化

python
# ❌ 低效:每次都进行复杂条件判断
async for event in graph.astream_events(...):
    if (event.get("event") == "on_chat_model_stream" and
        event.get("metadata", {}).get("langgraph_node") == "assistant" and
        "data" in event and
        event["data"].get("chunk") is not None):
        token = event["data"]["chunk"].content
        if token and len(token) > 0:
            print(token, end="")

# ✅ 高效:提前准备过滤条件
TARGET_EVENT = "on_chat_model_stream"
TARGET_NODE = "assistant"

async for event in graph.astream_events(...):
    # 快速检查事件类型
    if event.get("event") != TARGET_EVENT:
        continue
    
    # 快速检查节点
    if event.get("metadata", {}).get("langgraph_node") != TARGET_NODE:
        continue
    
    # 提取 token
    chunk = event.get("data", {}).get("chunk")
    if chunk and chunk.content:
        print(chunk.content, end="", flush=True)

优化方案 2:批量缓冲输出

python
class TokenBuffer:
    """Token 缓冲器"""
    
    def __init__(self, buffer_size=5, flush_interval=0.1):
        self.buffer_size = buffer_size
        self.flush_interval = flush_interval
        self.buffer = []
        self.last_flush = time.time()
    
    def add(self, token):
        """添加 Token"""
        self.buffer.append(token)
        
        # 达到缓冲区大小或超时则刷新
        if (len(self.buffer) >= self.buffer_size or
            time.time() - self.last_flush >= self.flush_interval):
            return self.flush()
        
        return None
    
    def flush(self):
        """刷新缓冲区"""
        if not self.buffer:
            return None
        
        batch = "".join(self.buffer)
        self.buffer.clear()
        self.last_flush = time.time()
        return batch

# 使用
async def buffered_stream(graph, input_data, config):
    """带缓冲的流式输出"""
    buffer = TokenBuffer(buffer_size=5)
    
    async for event in graph.astream_events(input_data, config, version="v2"):
        if event["event"] == "on_chat_model_stream":
            token = event["data"]["chunk"].content
            
            if token:
                batch = buffer.add(token)
                if batch:
                    print(batch, end="", flush=True)
    
    # 刷新剩余 Token
    final_batch = buffer.flush()
    if final_batch:
        print(final_batch, end="", flush=True)

优化方案 3:连接池管理

python
import asyncio
from asyncio import Semaphore

class ConcurrentStreamManager:
    """并发流管理器"""
    
    def __init__(self, max_concurrent=100):
        self.semaphore = Semaphore(max_concurrent)
        self.active_streams = {}
    
    async def stream_for_user(self, user_id, graph, input_data, config):
        """为特定用户流式输出(带并发控制)"""
        
        async with self.semaphore:  # 限制并发数
            try:
                self.active_streams[user_id] = {
                    "status": "streaming",
                    "started_at": time.time()
                }
                
                async for event in graph.astream_events(input_data, config, version="v2"):
                    if event["event"] == "on_chat_model_stream":
                        token = event["data"]["chunk"].content
                        if token:
                            yield token
                
                self.active_streams[user_id]["status"] = "completed"
                
            except Exception as e:
                self.active_streams[user_id]["status"] = "error"
                self.active_streams[user_id]["error"] = str(e)
                raise
            
            finally:
                # 清理
                if user_id in self.active_streams:
                    del self.active_streams[user_id]

# 使用
manager = ConcurrentStreamManager(max_concurrent=50)

async def handle_multiple_users():
    """处理多用户并发"""
    tasks = []
    
    for user_id in range(100):
        task = asyncio.create_task(
            process_user_stream(manager, user_id)
        )
        tasks.append(task)
    
    # 并发执行(最多 50 个同时进行)
    await asyncio.gather(*tasks)

async def process_user_stream(manager, user_id):
    """处理单个用户的流"""
    async for token in manager.stream_for_user(
        user_id,
        graph,
        input_data,
        config
    ):
        # 处理 token
        await send_to_user(user_id, token)

优化方案 4:使用 LangGraph API(推荐)

python
from langgraph_sdk import get_client

class ProductionStreamService:
    """生产级流式服务"""
    
    def __init__(self, api_url="http://localhost:2024"):
        self.client = get_client(url=api_url)
        self.connection_pool = {}
    
    async def stream_chat(self, user_id, message, thread_id):
        """流式聊天(使用 messages 模式)"""
        
        try:
            full_response = ""
            
            async for event in self.client.runs.stream(
                thread_id,
                assistant_id="chatbot",
                input={"messages": [{"role": "user", "content": message}]},
                stream_mode="messages"  # API 优化的模式
            ):
                if event.event == "messages/partial":
                    for item in event.data:
                        if "content" in item:
                            content = item["content"]
                            
                            # 计算增量
                            if len(content) > len(full_response):
                                new_tokens = content[len(full_response):]
                                full_response = content
                                
                                # 发送增量
                                yield new_tokens
            
        except asyncio.CancelledError:
            # 用户取消
            await self.cleanup_stream(thread_id)
            raise
    
    async def cleanup_stream(self, thread_id):
        """清理流资源"""
        # 取消运行中的请求
        pass

# FastAPI 集成
from fastapi import FastAPI, WebSocket

app = FastAPI()
service = ProductionStreamService()

@app.websocket("/ws/chat/{user_id}")
async def websocket_chat(websocket: WebSocket, user_id: str):
    """WebSocket 流式聊天"""
    await websocket.accept()
    
    try:
        while True:
            # 接收用户消息
            message = await websocket.receive_text()
            
            # 流式响应
            async for token in service.stream_chat(
                user_id,
                message,
                thread_id=f"user_{user_id}"
            ):
                await websocket.send_text(token)
            
            # 发送完成标记
            await websocket.send_text("[DONE]")
            
    except WebSocketDisconnect:
        print(f"User {user_id} disconnected")

优化方案 5:缓存和预加载

python
from functools import lru_cache

class CachedStreamService:
    """带缓存的流式服务"""
    
    def __init__(self):
        self.response_cache = {}
    
    @lru_cache(maxsize=1000)
    def get_cached_response(self, message_hash):
        """获取缓存的响应"""
        return self.response_cache.get(message_hash)
    
    async def stream_with_cache(self, message):
        """带缓存的流式输出"""
        message_hash = hash(message)
        
        # 检查缓存
        cached = self.get_cached_response(message_hash)
        if cached:
            # 模拟流式输出缓存的响应
            for token in cached.split():
                yield token + " "
                await asyncio.sleep(0.05)  # 模拟延迟
            return
        
        # 生成新响应
        full_response = ""
        async for event in graph.astream_events(...):
            if event["event"] == "on_chat_model_stream":
                token = event["data"]["chunk"].content
                if token:
                    full_response += token
                    yield token
        
        # 缓存响应
        self.response_cache[message_hash] = full_response

性能监控

python
import time
from dataclasses import dataclass

@dataclass
class StreamMetrics:
    """流式输出性能指标"""
    user_id: str
    start_time: float
    end_time: float = None
    token_count: int = 0
    total_bytes: int = 0
    
    @property
    def duration(self):
        if self.end_time:
            return self.end_time - self.start_time
        return time.time() - self.start_time
    
    @property
    def tokens_per_second(self):
        duration = self.duration
        return self.token_count / duration if duration > 0 else 0

class MonitoredStreamService:
    """带监控的流式服务"""
    
    def __init__(self):
        self.metrics = []
    
    async def monitored_stream(self, user_id, graph, input_data, config):
        """带性能监控的流式输出"""
        metric = StreamMetrics(user_id=user_id, start_time=time.time())
        
        try:
            async for event in graph.astream_events(input_data, config, version="v2"):
                if event["event"] == "on_chat_model_stream":
                    token = event["data"]["chunk"].content
                    
                    if token:
                        metric.token_count += 1
                        metric.total_bytes += len(token.encode('utf-8'))
                        yield token
        
        finally:
            metric.end_time = time.time()
            self.metrics.append(metric)
            
            # 记录性能
            print(f"\n📊 性能指标:")
            print(f"   用户: {metric.user_id}")
            print(f"   耗时: {metric.duration:.2f}s")
            print(f"   Token 数: {metric.token_count}")
            print(f"   速度: {metric.tokens_per_second:.1f} tokens/s")

问题 10: 设计完整的 Human-in-the-Loop 系统架构

展开查看完整解析

系统架构图

┌──────────────────────────────────────────────────────────────┐
│                     客户端层(Client Layer)                    │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐      │
│  │ Web UI   │  │ Mobile   │  │  API     │  │ CLI Tool │      │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘      │
└───────┼─────────────┼─────────────┼─────────────┼────────────┘
        │             │             │             │
        └─────────────┴─────────────┴─────────────┘

        ┌───────────────▼────────────────┐
        │   API 网关(Rate Limiting)      │
        └───────────────┬────────────────┘

┌───────────────────────▼────────────────────────────────────────┐
│                    服务层(Service Layer)                        │
│  ┌────────────────────────────────────────────────────────┐    │
│  │              Stream Manager(流管理)                      │    │
│  │  - 并发控制(Semaphore)                                   │    │
│  │  - 连接池管理                                              │    │
│  │  - 缓冲优化                                                │    │
│  └────────────────────────────────────────────────────────┘    │
│  ┌────────────────────────────────────────────────────────┐    │
│  │          Approval Manager(审批管理)                       │    │
│  │  - 静态断点 (interrupt_before)                            │    │
│  │  - 动态断点 (NodeInterrupt)                               │    │
│  │  - 审批队列                                                │    │
│  └────────────────────────────────────────────────────────┘    │
│  ┌────────────────────────────────────────────────────────┐    │
│  │          State Manager(状态管理)                          │    │
│  │  - update_state                                          │    │
│  │  - get_state_history                                     │    │
│  │  - 分叉管理                                                │    │
│  └────────────────────────────────────────────────────────┘    │
└──────────────────────┬─────────────────────────────────────────┘

┌──────────────────────▼─────────────────────────────────────────┐
│                   LangGraph 层(Graph Layer)                    │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐       │
│  │Assistant │  │  Tools   │  │  Router  │  │ Feedback │       │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘       │
└──────────────────────┬─────────────────────────────────────────┘

┌──────────────────────▼─────────────────────────────────────────┐
│                 持久化层(Persistence Layer)                     │
│  ┌────────────────┐  ┌────────────────┐  ┌────────────────┐   │
│  │  Checkpointer  │  │  Audit Logger  │  │  Metrics DB    │   │
│  │  (PostgreSQL)  │  │  (Elasticsearch)│  │  (Prometheus)  │   │
│  └────────────────┘  └────────────────┘  └────────────────┘   │
└────────────────────────────────────────────────────────────────┘

完整实现

python
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
import asyncio
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.errors import NodeInterrupt

@dataclass
class ApprovalRequest:
    """审批请求"""
    request_id: str
    thread_id: str
    tool_name: str
    tool_args: dict
    risk_level: str  # "low", "medium", "high"
    created_at: datetime
    status: str  # "pending", "approved", "rejected"

class HumanInTheLoopSystem:
    """完整的 Human-in-the-Loop 系统"""
    
    def __init__(
        self,
        postgres_connection: str,
        max_concurrent_streams: int = 100,
        buffer_size: int = 5
    ):
        # 组件初始化
        self.checkpointer = PostgresSaver(connection_string=postgres_connection)
        self.stream_semaphore = asyncio.Semaphore(max_concurrent_streams)
        self.buffer_size = buffer_size
        
        # 审批队列
        self.approval_queue: Dict[str, ApprovalRequest] = {}
        
        # 性能指标
        self.metrics = {
            "total_requests": 0,
            "approved": 0,
            "rejected": 0,
            "auto_approved": 0
        }
        
        # 构建图
        self.graph = self._build_graph()
    
    def _build_graph(self):
        """构建 LangGraph"""
        builder = StateGraph(MessagesState)
        
        # 添加节点
        builder.add_node("assistant", self._assistant_node)
        builder.add_node("risk_assessment", self._risk_assessment)
        builder.add_node("approval_gate", self._approval_gate)
        builder.add_node("tools", self._tools_node)
        
        # 添加边
        builder.add_edge(START, "assistant")
        builder.add_edge("assistant", "risk_assessment")
        builder.add_conditional_edges(
            "risk_assessment",
            self._route_after_risk,
            {
                "auto_approve": "tools",
                "need_approval": "approval_gate"
            }
        )
        builder.add_edge("approval_gate", "tools")
        builder.add_edge("tools", "assistant")
        
        # 编译
        return builder.compile(
            interrupt_before=["approval_gate"],
            checkpointer=self.checkpointer
        )
    
    def _assistant_node(self, state):
        """助手节点"""
        # LLM 调用
        pass
    
    def _risk_assessment(self, state):
        """风险评估节点"""
        last_message = state["messages"][-1]
        
        if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
            tool_call = last_message.tool_calls[0]
            
            # 评估风险
            risk_level = self._calculate_risk(tool_call)
            
            return {
                "risk_level": risk_level,
                "pending_tool": tool_call
            }
        
        return {"risk_level": "none"}
    
    def _calculate_risk(self, tool_call) -> str:
        """计算风险等级"""
        high_risk_tools = ["delete", "payment", "refund"]
        medium_risk_tools = ["update", "modify"]
        
        if tool_call['name'] in high_risk_tools:
            return "high"
        elif tool_call['name'] in medium_risk_tools:
            return "medium"
        else:
            return "low"
    
    def _route_after_risk(self, state):
        """风险评估后路由"""
        risk = state.get("risk_level", "low")
        
        if risk in ["high", "medium"]:
            return "need_approval"
        else:
            self.metrics["auto_approved"] += 1
            return "auto_approve"
    
    def _approval_gate(self, state):
        """审批门(空节点,用于断点)"""
        pass
    
    def _tools_node(self, state):
        """工具执行节点"""
        # 执行工具
        pass
    
    async def stream_with_hitl(
        self,
        user_id: str,
        message: str,
        thread_id: str
    ):
        """带 HITL 的流式执行"""
        
        async with self.stream_semaphore:  # 并发控制
            self.metrics["total_requests"] += 1
            
            # 配置
            config = {"configurable": {"thread_id": thread_id}}
            input_data = {"messages": [{"role": "user", "content": message}]}
            
            # 缓冲器
            buffer = []
            
            try:
                async for event in self.graph.astream_events(
                    input_data,
                    config,
                    version="v2"
                ):
                    # Token 流
                    if event["event"] == "on_chat_model_stream":
                        token = event["data"]["chunk"].content
                        
                        if token:
                            buffer.append(token)
                            
                            # 批量输出
                            if len(buffer) >= self.buffer_size:
                                yield "".join(buffer)
                                buffer.clear()
                
                # 刷新剩余
                if buffer:
                    yield "".join(buffer)
                
                # 检查是否需要审批
                state = self.graph.get_state(config)
                
                if state.tasks and state.tasks[0].interrupts:
                    # 创建审批请求
                    request = self._create_approval_request(
                        thread_id,
                        state
                    )
                    
                    # 等待审批
                    yield f"\n\n⏸️  等待审批(请求 ID: {request.request_id})"
                    
            except Exception as e:
                yield f"\n\n❌ 错误: {e}"
    
    def _create_approval_request(
        self,
        thread_id: str,
        state
    ) -> ApprovalRequest:
        """创建审批请求"""
        import uuid
        
        pending_tool = state.values.get('pending_tool')
        risk_level = state.values.get('risk_level', 'medium')
        
        request = ApprovalRequest(
            request_id=str(uuid.uuid4()),
            thread_id=thread_id,
            tool_name=pending_tool['name'],
            tool_args=pending_tool['args'],
            risk_level=risk_level,
            created_at=datetime.now(),
            status="pending"
        )
        
        self.approval_queue[request.request_id] = request
        return request
    
    async def approve_request(
        self,
        request_id: str,
        approved: bool,
        modified_args: Optional[dict] = None
    ):
        """审批请求"""
        request = self.approval_queue.get(request_id)
        
        if not request:
            raise ValueError(f"Request {request_id} not found")
        
        if approved:
            request.status = "approved"
            self.metrics["approved"] += 1
            
            # 更新状态
            config = {"configurable": {"thread_id": request.thread_id}}
            
            if modified_args:
                self.graph.update_state(
                    config,
                    {"modified_params": modified_args},
                    as_node="approval_gate"
                )
            
            # 继续执行
            result = []
            async for event in self.graph.astream_events(None, config, version="v2"):
                if event["event"] == "on_chat_model_stream":
                    token = event["data"]["chunk"].content
                    if token:
                        result.append(token)
            
            return "".join(result)
        
        else:
            request.status = "rejected"
            self.metrics["rejected"] += 1
            return "操作已拒绝"
    
    def get_metrics(self) -> dict:
        """获取系统指标"""
        return {
            **self.metrics,
            "approval_rate": self.metrics["approved"] / max(self.metrics["total_requests"], 1),
            "pending_approvals": len([r for r in self.approval_queue.values() if r.status == "pending"])
        }

# FastAPI 集成示例
from fastapi import FastAPI, WebSocket, HTTPException

app = FastAPI()
hitl_system = HumanInTheLoopSystem(
    postgres_connection="postgresql://user:pass@localhost/db",
    max_concurrent_streams=100
)

@app.websocket("/ws/chat/{user_id}")
async def websocket_chat(websocket: WebSocket, user_id: str):
    """WebSocket 聊天端点"""
    await websocket.accept()
    
    try:
        while True:
            message = await websocket.receive_text()
            
            async for token in hitl_system.stream_with_hitl(
                user_id,
                message,
                thread_id=f"user_{user_id}"
            ):
                await websocket.send_text(token)
    
    except WebSocketDisconnect:
        pass

@app.post("/approve/{request_id}")
async def approve_request(
    request_id: str,
    approved: bool,
    modified_args: Optional[dict] = None
):
    """审批端点"""
    try:
        result = await hitl_system.approve_request(
            request_id,
            approved,
            modified_args
        )
        return {"status": "success", "result": result}
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))

@app.get("/metrics")
async def get_metrics():
    """获取系统指标"""
    return hitl_system.get_metrics()

🎉 复习完成!

知识掌握度自测

完成以上 10 道问题后,请评估你的掌握程度:

  • [ ] Breakpoints:能独立实现工具调用审批流程
  • [ ] Dynamic Breakpoints:能根据业务逻辑条件性中断
  • [ ] Edit State:理解 as_node 和 Message ID 机制
  • [ ] Streaming:能实现生产级的 Token 流式输出
  • [ ] Time Travel:掌握历史浏览、回放和分叉技术

下一步学习路径

根据你的掌握情况,选择下一步方向:

巩固基础(掌握度 < 70%):

  • 重新学习未掌握的章节
  • 完成每节课的实战练习
  • 构建小型示例项目验证理解

进阶学习(掌握度 70%-90%):

  • 学习 Module-5:高级图模式(并行、子图、动态路由)
  • 学习 Module-6:生产部署与优化
  • 构建中型综合项目

专家路径(掌握度 > 90%):

  • 学习 Module-7:大型实战项目
  • 研究 LangGraph 源码
  • 贡献开源项目或撰写技术博客

实践建议

项目 1:智能客服系统(中级)

  • 需求:支持订单查询、退款审批、投诉处理
  • 技术:Breakpoints + Edit State + Streaming
  • 预计时间:3-5 天

项目 2:内容审核平台(高级)

  • 需求:多维度风险评估、条件性审批、审批日志
  • 技术:Dynamic Breakpoints + 完整审计系统
  • 预计时间:5-7 天

项目 3:AI 决策调试工具(专家级)

  • 需求:历史浏览、回放、A/B 测试、性能监控
  • 技术:Time Travel + 所有 HITL 技术
  • 预计时间:7-10 天

参考资源

官方文档:

社区资源:

进阶阅读:

  • Python Async/Await 深入教程
  • WebSocket 实时通信
  • PostgreSQL 性能优化

恭喜你完成 Module-4 的复习! 🎊

你现在已经掌握了构建可控、可信、可靠的 Human-in-the-Loop AI 系统的全部技能。这些技术不仅是工具,更是一套完整的"可信 AI"设计哲学。在未来的 AI 应用开发中,Human-in-the-Loop 将成为区分优秀系统和卓越系统的关键标志。

继续加油,向着精通 LangGraph 的目标前进! 🚀

基于 MIT 许可证发布。内容版权归作者所有。