Skip to content

Module-2 小结和复习:核心架构模式掌握指南

来自图灵奖获得者的寄语

恭喜你完成了 Module-2 的学习!你已经掌握了 LangGraph 的六大核心架构模式。这些模式是构建生产级 AI Agent 的基石。正如计算机科学中的设计模式一样,理解何时使用哪种模式比记住语法更重要。本复习文档通过 15 个深度问题,帮助你从"知道"进化到"精通"。

记住:伟大的系统架构师不是记住了所有模式,而是能够在正确的场景选择正确的模式。


📋 本章核心知识回顾

学习地图

Module-2: 核心架构模式
├─ 2.1 Simple Graph (基础图)
│  └─ 状态、节点、边的基本概念
├─ 2.2 Chain (链式架构)
│  └─ 消息系统、工具绑定、Reducer
├─ 2.3 Router (路由架构)
│  └─ 条件边、动态路由、工具调用
├─ 2.4 Agent (智能体架构)
│  └─ ReAct 循环、多步推理、工具执行
├─ 2.5 Agent Memory (记忆管理)
│  └─ Checkpointer、Thread、状态持久化
└─ 2.6 Deployment (生产部署)
   └─ LangGraph Cloud、Studio、SDK

六大模式速查表

模式核心特征适用场景关键技术
Simple Graph3-5 节点,条件分支学习基础、简单流程TypedDict, Literal
Chain线性流程,工具绑定单次工具调用add_messages, bind_tools
RouterLLM 决策路由智能分类、动态选择tools_condition, ToolNode
AgentReAct 循环推理多步任务、复杂推理循环边、工具反馈
Agent Memory状态持久化多轮对话、上下文连续Checkpointer, Thread
Deployment生产环境部署真实用户服务LangGraph Cloud, SDK

📚 术语表

术语名称LangGraph 定义和解读Python 定义和说明重要程度
StateGraph核心图构建器,管理有状态工作流的创建和执行接受 State 类型参数的泛型类,提供节点和边的添加方法⭐⭐⭐⭐⭐
MessagesState专为对话设计的预定义状态类包含 messages 字段并自动使用 add_messages reducer⭐⭐⭐⭐⭐
add_messages智能消息合并 reducer,支持追加和去重接收旧消息列表和新消息,返回合并后的列表⭐⭐⭐⭐⭐
Reducer控制状态更新方式的函数,实现非覆盖式合并函数签名为 (old_value, new_value) -> merged_value⭐⭐⭐⭐⭐
Conditional Edge根据状态动态选择下一节点的智能路由边通过 add_conditional_edges 添加,需提供返回节点名的函数⭐⭐⭐⭐⭐
ToolNode自动执行工具调用的预构建节点解析 AIMessage.tool_calls,调用函数,返回 ToolMessage⭐⭐⭐⭐⭐
Checkpointer状态持久化机制,在节点执行后自动保存状态接口类,MemorySaver/PostgresSaver 是其实现⭐⭐⭐⭐⭐
Thread独立的对话会话容器,通过 thread_id 隔离状态每个 thread 维护独立的 checkpoint 历史⭐⭐⭐⭐⭐
Agent支持多步推理和工具调用的智能体架构通过循环边实现 ReAct 模式(推理-行动-观察)⭐⭐⭐⭐⭐
ReActReasoning + Acting,Agent 的经典实现模式LLM 基于工具结果持续推理直到得出最终答案⭐⭐⭐⭐⭐
LangGraph Cloud托管部署服务,从 GitHub 自动部署到生产环境提供唯一 URL、监控追踪、环境变量管理⭐⭐⭐⭐⭐
LangGraph SDKPython 客户端库,用于程序化调用部署的图提供异步 API 访问 assistants、threads、runs⭐⭐⭐⭐⭐

🎯 复习题目列表

本复习包含 15 个渐进式问题,覆盖知识理解、代码实现、架构设计三个层次:

基础理解题(1-5)

  1. LangGraph 状态管理的核心机制是什么?
  2. Router 和 Agent 的本质区别是什么?
  3. 为什么需要 add_messages Reducer?
  4. Checkpointer 的工作原理是什么?
  5. 条件边(Conditional Edge)如何决定路由?

代码实现题(6-10) 6. 如何实现一个带多个工具的 Router? 7. 如何让 Agent 避免无限循环? 8. 如何实现跨会话的对话记忆? 9. 如何在本地测试后部署到云端? 10. 如何处理工具调用失败的情况?

架构设计题(11-15) 11. 设计一个客服机器人:应该选择哪种架构模式? 12. 如何优化 Agent 的 Token 使用? 13. 如何设计多用户并发的 Agent 系统? 14. Simple Graph vs Chain vs Agent:如何选择? 15. 如何设计可扩展的工具系统?


📚 详细问答解析

问题 1: LangGraph 状态管理的核心机制是什么?

💡 点击查看答案

答案:

LangGraph 的状态管理基于 TypedDict + Annotated + Reducer 的三层架构。

1. TypedDict:定义状态结构

python
from typing_extensions import TypedDict

class State(TypedDict):
    messages: list
    count: int
    user_id: str

作用:

  • 提供类型提示和 IDE 支持
  • 定义状态的"骨架"
  • 不强制运行时验证(与 Pydantic 不同)

2. Annotated:附加元数据

python
from typing import Annotated
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages]
    #          ^^^^^^^^      ^^^^^^^^^^^^^
    #          基础类型      Reducer 函数
    count: int  # 没有 Annotated = 默认覆盖行为

作用:

  • 为字段添加额外信息(Reducer)
  • 不影响类型检查
  • LangGraph 读取元数据来决定状态更新方式

3. Reducer:控制更新逻辑

默认行为(无 Reducer):

python
# 初始状态
state = {"count": 5}

# 节点返回
node_output = {"count": 10}

# 结果:覆盖
new_state = {"count": 10}  # 5 被替换

使用 Reducer(add_messages):

python
# 初始状态
state = {"messages": [msg1, msg2]}

# 节点返回
node_output = {"messages": [msg3]}

# 结果:追加
new_state = {"messages": [msg1, msg2, msg3]}

4. add_messages 的智能特性

python
from langgraph.graph.message import add_messages

# 特性 1:追加新消息
state = {"messages": [HumanMessage("Hi")]}
update = {"messages": [AIMessage("Hello")]}
# 结果:[HumanMessage("Hi"), AIMessage("Hello")]

# 特性 2:基于 ID 更新(去重)
state = {"messages": [AIMessage("Thinking...", id="msg1")]}
update = {"messages": [AIMessage("Answer is 42", id="msg1")]}
# 结果:[AIMessage("Answer is 42", id="msg1")]  ← 替换而非追加

5. 自定义 Reducer

python
import operator

class State(TypedDict):
    # 数字累加
    score: Annotated[int, operator.add]

    # 列表合并
    items: Annotated[list, operator.add]

    # 自定义:只保留最近 N 条
    recent_logs: Annotated[list, lambda old, new: (old + new)[-10:]]

执行效果:

python
# score 使用 operator.add
state = {"score": 5}
update = {"score": 3}
# 结果:{"score": 8}  ← 5 + 3

# items 使用 operator.add(列表拼接)
state = {"items": [1, 2]}
update = {"items": [3, 4]}
# 结果:{"items": [1, 2, 3, 4]}

# recent_logs 使用自定义 lambda
state = {"recent_logs": [1,2,3,4,5,6,7,8,9,10]}
update = {"recent_logs": [11, 12]}
# 结果:{"recent_logs": [3,4,5,6,7,8,9,10,11,12]}  ← 只保留最后 10 个

6. 核心原理总结

python
# 状态更新流程
初始状态 → 节点执行 → 返回更新 → Reducer 合并 → 新状态

# 伪代码
def update_state(old_state, node_output):
    new_state = {}
    for key, value in node_output.items():
        if has_reducer(key):
            new_state[key] = reducer(old_state[key], value)
        else:
            new_state[key] = value  # 直接覆盖
    return {**old_state, **new_state}

7. 最佳实践

python
# ✅ 好的设计
class ChatState(TypedDict):
    messages: Annotated[list, add_messages]  # 对话历史:追加
    user_id: str                              # 用户标识:覆盖
    session_count: Annotated[int, operator.add]  # 会话计数:累加

# ❌ 不好的设计
class BadState(TypedDict):
    messages: list  # 没有 Reducer,每次覆盖 → 丢失历史
    all_data: dict  # 太宽泛,难以维护

关键要点

  • TypedDict:定义"什么"(数据结构)
  • Annotated:定义"如何"(更新方式)
  • Reducer:实现"行为"(合并逻辑)
  • add_messages:专为对话设计的智能 Reducer

问题 2: Router 和 Agent 的本质区别是什么?

💡 点击查看答案

答案:

Router 和 Agent 的本质区别在于 工具结果的流向决策次数

1. 架构流程对比

Router 架构:

用户输入 → LLM 决策 → 工具执行 → 直接返回用户

         调用一次工具
         做一次决策

Agent 架构:

用户输入 → LLM 决策 → 工具执行 → 返回 LLM → 继续决策 → ...
                ↑______________________|
                        循环反馈
         可能调用多次工具
         做多次决策

2. 代码结构对比

Router 的图结构:

python
from langgraph.prebuilt import tools_condition, ToolNode

builder = StateGraph(MessagesState)
builder.add_node("llm", call_llm)
builder.add_node("tools", ToolNode([multiply]))

builder.add_edge(START, "llm")
builder.add_conditional_edges("llm", tools_condition)
builder.add_edge("tools", END)  # ← 关键:工具后直接结束

graph = builder.compile()

执行流程:

START → llm → [有工具调用] → tools → END
              [无工具调用] → END

Agent 的图结构:

python
builder = StateGraph(MessagesState)
builder.add_node("agent", call_llm)
builder.add_node("tools", ToolNode([multiply]))

builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", tools_condition)
builder.add_edge("tools", "agent")  # ← 关键:工具后回到 agent,形成循环

graph = builder.compile()

执行流程:

START → agent → [有工具调用] → tools → agent → [继续判断] → ...
                [无工具调用] → END

3. 实际案例对比

场景: 计算 (3 + 4) × 2 ÷ 5

Router 的执行:

python
# 第 1 次调用图
输入: "Calculate (3 + 4) × 2 ÷ 5"
LLM: 调用 add(3, 4)
工具: 返回 7
输出: ToolMessage(content="7")
# 结束 ❌ 无法继续计算

# 需要手动第 2 次调用
输入: "Now multiply 7 by 2"
LLM: 调用 multiply(7, 2)
工具: 返回 14
输出: ToolMessage(content="14")
# 结束 ❌ 还是无法完成

Agent 的执行:

python
输入: "Calculate (3 + 4) × 2 ÷ 5"

# 循环 1
LLM: "需要先计算 3 + 4" → 调用 add(3, 4)
工具: 返回 7
→ 回到 LLM

# 循环 2
LLM: "得到 7,现在乘以 2" → 调用 multiply(7, 2)
工具: 返回 14
→ 回到 LLM

# 循环 3
LLM: "得到 14,现在除以 5" → 调用 divide(14, 5)
工具: 返回 2.8
→ 回到 LLM

# 循环 4
LLM: "所有计算完成,答案是 2.8"
输出: AIMessage(content="The answer is 2.8")
→ 结束 ✅

4. 工具结果的流向

Router:

python
def tools_node(state):
    # 执行工具
    result = tool.invoke(...)
    return {"messages": [ToolMessage(result)]}
    # ↓
    # 流向:直接到 END,用户可见

Agent:

python
def tools_node(state):
    # 执行工具
    result = tool.invoke(...)
    return {"messages": [ToolMessage(result)]}
    # ↓
    # 流向:回到 agent 节点,LLM 分析结果

5. 决策次数对比

Router:单次决策

python
用户问题 → [LLM 思考 1 次] → 调用工具/直接回答 → 结束

Agent:多次决策

python
用户问题 → [LLM 思考 1] → 工具 → [LLM 思考 2] → 工具 → ... → [LLM 思考 N] → 回答

6. 适用场景对比

场景推荐架构原因
简单问答("今天天气")Router单次查询即可
信息检索("搜索论文")Router一次搜索返回
数学计算(多步骤)Agent需要基于中间结果推理
研究助手(搜索→阅读→总结)Agent需要多步骤协作
客服(查订单→回答)Router单次工具调用
数据分析(查询→计算→可视化)Agent需要连续操作

7. 代码对比总结

唯一的区别:一条边

python
# Router
builder.add_edge("tools", END)  # 工具 → 结束

# Agent
builder.add_edge("tools", "agent")  # 工具 → 回到 agent(循环)

这一条边的差异,导致了:

  • ✅ Router:简单、快速、可预测
  • ✅ Agent:灵活、强大、智能
  • ❌ Router:无法多步推理
  • ❌ Agent:可能循环、成本高

关键要点

  • Router = 单次决策 + 工具直达用户
  • Agent = 多次决策 + 工具反馈 LLM
  • 区别在于:是否有从 tools 到 agent 的边
  • 选择标准:任务是否需要多步推理

问题 3: 为什么需要 add_messages Reducer?

💡 点击查看答案

答案:

add_messages 解决了对话系统中 状态覆盖导致历史丢失 的核心问题。

1. 问题演示:没有 Reducer 会发生什么

python
# 错误的状态定义(没有 Reducer)
class State(TypedDict):
    messages: list  # 默认行为:覆盖

# 初始状态
state = {"messages": [HumanMessage("Hi")]}

# 节点 1 执行
def node1(state):
    return {"messages": [AIMessage("Hello!")]}

# 更新后的状态
# ❌ 预期:[HumanMessage("Hi"), AIMessage("Hello!")]
# ✅ 实际:[AIMessage("Hello!")]  ← HumanMessage 丢失了!

问题分析:

  • LangGraph 默认使用 字典更新语义dict.update()
  • 相同 key 的值会被完全替换,而不是合并
  • 对话历史被覆盖 → LLM 失去上下文 → 无法理解指代词

2. 真实场景的灾难

python
# 第一轮对话
user: "我的订单号是 12345"
agent: "好的,我看到您的订单 12345"

# 第二轮对话(没有 Reducer)
state = {"messages": [AIMessage("好的,我看到您的订单 12345")]}
# ❌ 用户的消息丢失了!

user: "那个订单发货了吗?"
agent: "请问您的订单号是多少?"  ← 忘记了刚才说的 12345

3. add_messages 的作用

python
from langgraph.graph.message import add_messages
from typing import Annotated

class State(TypedDict):
    messages: Annotated[list, add_messages]
    #                   ^^^^  ^^^^^^^^^^^^^
    #                   类型  Reducer 函数

执行效果:

python
# 初始状态
state = {"messages": [HumanMessage("Hi")]}

# 节点返回
update = {"messages": [AIMessage("Hello!")]}

# add_messages 自动合并
new_state = {"messages": [
    HumanMessage("Hi"),      ← 保留
    AIMessage("Hello!")      ← 追加
]}

4. add_messages 的智能特性

特性 1:自动追加

python
messages = [msg1, msg2]
add_messages(messages, [msg3])
# 结果:[msg1, msg2, msg3]

特性 2:基于 ID 更新(去重)

python
messages = [AIMessage("Loading...", id="response-1")]
add_messages(messages, [AIMessage("Done!", id="response-1")])
# 结果:[AIMessage("Done!", id="response-1")]  ← 替换而非重复

特性 3:支持多种输入格式

python
# 单个消息
add_messages(messages, AIMessage("Hi"))

# 消息列表
add_messages(messages, [msg1, msg2])

# 混合类型
add_messages(messages, [HumanMessage("Q"), AIMessage("A")])

5. 没有 Reducer 的后果对比

场景: 三轮对话

没有 add_messages:

python
# 轮次 1
state = {"messages": [HumanMessage("Hi")]}
update = {"messages": [AIMessage("Hello")]}
# 状态:[AIMessage("Hello")]  ❌ Hi 丢失

# 轮次 2
state = {"messages": [AIMessage("Hello")]}
update = {"messages": [HumanMessage("How are you?")]}
# 状态:[HumanMessage("How are you?")]  ❌ Hello 丢失

# 轮次 3
state = {"messages": [HumanMessage("How are you?")]}
update = {"messages": [AIMessage("I'm good")]}
# 状态:[AIMessage("I'm good")]  ❌ 问题丢失

# LLM 只能看到最后一条消息,无法理解上下文!

有 add_messages:

python
# 轮次 1
state = {"messages": [HumanMessage("Hi")]}
update = {"messages": [AIMessage("Hello")]}
# 状态:[HumanMessage("Hi"), AIMessage("Hello")]  ✅

# 轮次 2
state = {"messages": [HumanMessage("Hi"), AIMessage("Hello")]}
update = {"messages": [HumanMessage("How are you?")]}
# 状态:[HumanMessage("Hi"), AIMessage("Hello"), HumanMessage("How are you?")]  ✅

# 轮次 3
state = {"messages": [..., HumanMessage("How are you?")]}
update = {"messages": [AIMessage("I'm good")]}
# 状态:[..., HumanMessage("How are you?"), AIMessage("I'm good")]  ✅

# LLM 可以看到完整对话历史!

6. add_messages 的内部实现(简化版)

python
def add_messages(existing: list, new: list | BaseMessage) -> list:
    """智能合并消息列表"""
    # 1. 标准化输入
    if not isinstance(new, list):
        new = [new]

    # 2. 构建 ID 映射
    id_map = {msg.id: i for i, msg in enumerate(existing) if msg.id}

    # 3. 合并逻辑
    result = existing.copy()
    for msg in new:
        if msg.id and msg.id in id_map:
            # 有 ID 且已存在 → 更新
            result[id_map[msg.id]] = msg
        else:
            # 无 ID 或不存在 → 追加
            result.append(msg)

    return result

7. 其他常用 Reducer

python
import operator

class State(TypedDict):
    # 数字累加
    total_cost: Annotated[float, operator.add]

    # 列表拼接
    search_results: Annotated[list, operator.add]

    # 集合合并
    visited_pages: Annotated[set, operator.or_]

    # 自定义:保留最大值
    max_score: Annotated[float, max]

8. 何时不需要 Reducer

python
class State(TypedDict):
    messages: Annotated[list, add_messages]  # 需要:对话历史
    user_id: str                              # 不需要:固定标识
    current_page: int                         # 不需要:当前值覆盖即可
    is_authenticated: bool                    # 不需要:状态标记

原则:

  • 需要累积的数据 → 使用 Reducer
  • 需要替换的数据 → 不使用 Reducer

关键要点

  • 问题: 默认字典更新会覆盖整个列表
  • 后果: 对话历史丢失,LLM 失去上下文
  • 解决: add_messages 智能追加/更新消息
  • 本质: 将"覆盖语义"改为"合并语义"

问题 4: Checkpointer 的工作原理是什么?

💡 点击查看答案

答案:

Checkpointer 是 LangGraph 的 自动快照系统,在每个节点执行后保存状态,实现多轮对话和时间旅行。

1. 核心概念

Checkpoint(检查点): 图执行过程中某个时刻的完整状态快照

python
checkpoint = {
    "state": {"messages": [msg1, msg2]},  # 状态数据
    "metadata": {
        "node": "agent",                   # 当前节点
        "step": 2,                         # 执行步数
        "timestamp": "2024-03-15T10:30:00"
    },
    "parent_id": "checkpoint-1",          # 父检查点 ID
    "id": "checkpoint-2"                  # 当前检查点 ID
}

2. 工作流程

没有 Checkpointer:

python
graph = builder.compile()  # 无状态持久化

# 第一次调用
result1 = graph.invoke({"messages": [msg1]})
# 状态:存在于内存,调用结束后丢失

# 第二次调用
result2 = graph.invoke({"messages": [msg2]})
# 状态:全新开始,不记得 msg1

有 Checkpointer:

python
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# 第一次调用
config = {"configurable": {"thread_id": "user-123"}}
result1 = graph.invoke({"messages": [msg1]}, config)
# ✅ 状态自动保存到 checkpointer

# 第二次调用(同一 thread_id)
result2 = graph.invoke({"messages": [msg2]}, config)
# ✅ 自动加载之前的状态,msg1 还在!

3. 保存时机

Checkpointer 在 每个节点执行后 自动保存:

python
graph = StateGraph(State)
graph.add_node("node1", node1_func)
graph.add_node("node2", node2_func)
graph.add_edge(START, "node1")
graph.add_edge("node1", "node2")
graph.add_edge("node2", END)

graph_with_memory = graph.compile(checkpointer=memory)

执行流程:

START

node1 执行

💾 Checkpoint 1: {"state": {...}, "node": "node1", "step": 1}

node2 执行

💾 Checkpoint 2: {"state": {...}, "node": "node2", "step": 2}

END

4. Thread:状态容器

Thread(线程) 是隔离不同会话状态的标识符。

python
# 用户 A 的对话
config_a = {"configurable": {"thread_id": "user-a"}}
graph.invoke(input, config_a)
# 保存到 thread_id="user-a"

# 用户 B 的对话(完全独立)
config_b = {"configurable": {"thread_id": "user-b"}}
graph.invoke(input, config_b)
# 保存到 thread_id="user-b"

Thread 结构:

Checkpointer(全局)
├─ Thread: user-a
│  ├─ Checkpoint 1 (step 1)
│  ├─ Checkpoint 2 (step 2)
│  └─ Checkpoint 3 (step 3)
├─ Thread: user-b
│  ├─ Checkpoint 1
│  └─ Checkpoint 2
└─ Thread: user-c
   └─ Checkpoint 1

5. 实际案例:多轮对话

python
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "conversation-1"}}

# 轮次 1
graph.invoke({"messages": [HumanMessage("3 + 4 = ?")]}, config)
# 💾 保存:{"messages": [HumanMessage("3 + 4 = ?"), AIMessage("7")]}

# 轮次 2(几分钟后)
graph.invoke({"messages": [HumanMessage("乘以 2")]}, config)
# ✅ 自动加载之前的状态
# 输入变成:{"messages": [
#     HumanMessage("3 + 4 = ?"),
#     AIMessage("7"),
#     HumanMessage("乘以 2")  ← 新消息追加
# ]}
# 💾 保存:包含所有历史的新状态

6. Checkpointer 类型对比

类型存储位置持久化适用场景优点缺点
MemorySaver进程内存开发测试快速、简单重启丢失
SqliteSaverSQLite 文件单机生产持久化、本地不支持分布式
PostgresSaverPostgreSQL分布式生产可扩展、并发需要数据库
RedisSaverRedis高性能生产低延迟、高吞吐需要 Redis

代码示例:

python
# 开发环境
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()

# 生产环境
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost/db"
)

graph = builder.compile(checkpointer=checkpointer)

7. 时间旅行:回溯到历史状态

python
# 运行多步
config = {"configurable": {"thread_id": "thread-1"}}
graph.invoke(input1, config)  # Checkpoint 1
graph.invoke(input2, config)  # Checkpoint 2
graph.invoke(input3, config)  # Checkpoint 3

# 获取所有检查点
checkpoints = list(graph.checkpointer.list(config))
for cp in checkpoints:
    print(f"Step {cp.metadata['step']}: {cp.state}")

# 回溯到 Checkpoint 2
config_with_checkpoint = {
    "configurable": {
        "thread_id": "thread-1",
        "checkpoint_id": checkpoints[1].id  # 第 2 个检查点
    }
}
graph.invoke(input4, config_with_checkpoint)
# 从 Checkpoint 2 的状态继续执行

8. Checkpointer 的内部实现(简化)

python
class MemorySaver:
    def __init__(self):
        self.storage = {}  # {thread_id: [checkpoint1, checkpoint2, ...]}

    def save(self, config, state, metadata):
        """保存检查点"""
        thread_id = config["configurable"]["thread_id"]
        checkpoint = {
            "id": generate_id(),
            "state": state,
            "metadata": metadata,
            "timestamp": now()
        }
        self.storage.setdefault(thread_id, []).append(checkpoint)

    def load(self, config):
        """加载最新检查点"""
        thread_id = config["configurable"]["thread_id"]
        checkpoints = self.storage.get(thread_id, [])
        return checkpoints[-1] if checkpoints else None

    def list(self, config):
        """列出所有检查点"""
        thread_id = config["configurable"]["thread_id"]
        return self.storage.get(thread_id, [])

9. 何时需要 Checkpointer

需要的场景:

  • 多轮对话系统(聊天机器人)
  • 长时间运行的任务(需要中断恢复)
  • 需要上下文连续性("那个"、"它"等指代)
  • 调试和审计(查看历史执行)
  • A/B 测试(对比不同版本)

不需要的场景:

  • 无状态 API(每次请求独立)
  • 批处理任务(不需要记忆)
  • 一次性查询(无需保存历史)

10. 最佳实践

python
# ✅ 好的设计
config = {
    "configurable": {
        "thread_id": f"user-{user_id}-session-{session_id}",
        # 细粒度隔离,便于管理
    }
}

# 定期清理
def cleanup_old_threads():
    cutoff = datetime.now() - timedelta(days=30)
    for thread_id in get_all_threads():
        if thread_last_accessed(thread_id) < cutoff:
            checkpointer.delete_thread({"configurable": {"thread_id": thread_id}})

# ❌ 不好的设计
config = {"configurable": {"thread_id": "global"}}
# 所有用户共享状态 → 数据混乱

关键要点

  • Checkpointer = 自动快照系统
  • 保存时机:每个节点执行后
  • Thread = 隔离不同会话的状态容器
  • 用途:多轮对话、状态持久化、时间旅行
  • 生产环境:使用 PostgresSaver 或 RedisSaver

问题 5: 条件边(Conditional Edge)如何决定路由?

💡 点击查看答案

答案:

条件边通过 条件函数 动态决定下一个执行的节点,实现图的分支逻辑。

1. 基础概念

普通边(Normal Edge): 固定路由

python
builder.add_edge("node_a", "node_b")
# node_a 总是流向 node_b

条件边(Conditional Edge): 动态路由

python
builder.add_conditional_edges("node_a", condition_func)
# node_a 根据 condition_func 的返回值决定下一步

2. 条件函数的结构

python
from typing import Literal

def condition_func(state: State) -> Literal["node_b", "node_c", "__end__"]:
    """
    条件函数:分析状态,返回目标节点名称

    参数:
        state: 当前图状态

    返回:
        str: 目标节点的名称(必须是已添加的节点)
    """
    # 读取状态
    value = state["some_field"]

    # 决策逻辑
    if value > 10:
        return "node_b"
    elif value > 0:
        return "node_c"
    else:
        return "__end__"  # 或使用 END

关键特性:

  • 输入: 当前状态(State 对象)
  • 输出: 节点名称(字符串)
  • 类型提示: Literal 明确所有可能的路由

3. 完整示例:动态路由

python
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from typing import Literal

# 定义状态
class State(TypedDict):
    value: int
    path: list[str]

# 定义节点
def input_node(state: State):
    return {"path": state["path"] + ["input"]}

def process_high(state: State):
    return {"path": state["path"] + ["high"]}

def process_low(state: State):
    return {"path": state["path"] + ["low"]}

def end_node(state: State):
    return {"path": state["path"] + ["end"]}

# 条件函数
def route_based_on_value(state: State) -> Literal["process_high", "process_low", "end"]:
    if state["value"] > 10:
        return "process_high"
    elif state["value"] > 0:
        return "process_low"
    else:
        return "end"

# 构建图
builder = StateGraph(State)
builder.add_node("input", input_node)
builder.add_node("process_high", process_high)
builder.add_node("process_low", process_low)
builder.add_node("end", end_node)

builder.add_edge(START, "input")
builder.add_conditional_edges(
    "input",                  # 源节点
    route_based_on_value     # 条件函数
)
builder.add_edge("process_high", "end")
builder.add_edge("process_low", "end")
builder.add_edge("end", END)

graph = builder.compile()

# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))

# 测试
result1 = graph.invoke({"value": 15, "path": []})
print(result1["path"])  # ['input', 'high', 'end']

result2 = graph.invoke({"value": 5, "path": []})
print(result2["path"])  # ['input', 'low', 'end']

result3 = graph.invoke({"value": -1, "path": []})
print(result3["path"])  # ['input', 'end']

4. tools_condition:预构建的条件函数

LangGraph 提供了常用的 tools_condition

python
from langgraph.prebuilt import tools_condition

# 内部实现(简化)
def tools_condition(state: MessagesState) -> Literal["tools", "__end__"]:
    """检查是否需要调用工具"""
    last_message = state["messages"][-1]

    # 检查最后一条消息是否有工具调用
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"  # 有工具调用 → 路由到 tools 节点
    else:
        return "__end__"  # 无工具调用 → 结束

使用示例:

python
builder.add_conditional_edges("agent", tools_condition)

# 等价于手动写:
def my_tools_condition(state):
    if state["messages"][-1].tool_calls:
        return "tools"
    return END

builder.add_conditional_edges("agent", my_tools_condition)

5. 高级用法:多目标路由映射

方式 1:自动映射(推荐)

python
def condition(state) -> Literal["node_a", "node_b", "node_c"]:
    # 返回值自动匹配节点名称
    return "node_a"

builder.add_conditional_edges("source", condition)
# LangGraph 自动创建路由:source → node_a/node_b/node_c

方式 2:显式映射

python
def condition(state) -> str:
    return "path_1"  # 返回路径名称(而非节点名称)

builder.add_conditional_edges(
    "source",
    condition,
    {
        "path_1": "node_a",  # 路径名 → 节点名
        "path_2": "node_b",
        "default": "node_c"
    }
)

6. 实战案例:客服路由

python
from langchain_core.messages import HumanMessage

class CustomerServiceState(TypedDict):
    messages: list
    category: str

def classify_intent(state: CustomerServiceState) -> Literal["billing", "technical", "general"]:
    """根据用户消息分类意图"""
    last_message = state["messages"][-1].content.lower()

    # 简单关键词匹配(实际应用中使用 LLM 分类)
    if "payment" in last_message or "bill" in last_message:
        return "billing"
    elif "error" in last_message or "bug" in last_message:
        return "technical"
    else:
        return "general"

# 不同部门的处理节点
def billing_handler(state):
    return {"messages": [AIMessage("Billing team will assist you...")]}

def technical_handler(state):
    return {"messages": [AIMessage("Technical support will help...")]}

def general_handler(state):
    return {"messages": [AIMessage("How can I help you today?")]}

# 构建图
builder = StateGraph(CustomerServiceState)
builder.add_node("classifier", lambda s: s)  # 占位节点
builder.add_node("billing", billing_handler)
builder.add_node("technical", technical_handler)
builder.add_node("general", general_handler)

builder.add_edge(START, "classifier")
builder.add_conditional_edges("classifier", classify_intent)
builder.add_edge("billing", END)
builder.add_edge("technical", END)
builder.add_edge("general", END)

graph = builder.compile()

# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))

# 测试
result = graph.invoke({
    "messages": [HumanMessage("I have a payment issue")],
    "category": ""
})
# 路由:classifier → billing → END

7. 条件边的执行流程

python
# 1. 源节点执行
state = source_node(state)

# 2. 调用条件函数
next_node_name = condition_func(state)

# 3. 根据返回值路由
if next_node_name == "node_a":
    state = node_a(state)
elif next_node_name == "node_b":
    state = node_b(state)
elif next_node_name == END:
    return state
else:
    raise ValueError(f"Unknown node: {next_node_name}")

8. 常见错误与调试

错误 1:返回不存在的节点名

python
def bad_condition(state) -> str:
    return "non_existent_node"  # ❌ 这个节点没有被添加

builder.add_conditional_edges("source", bad_condition)
# 运行时错误:Node 'non_existent_node' not found

修复:

python
# 方式 1:确保节点存在
builder.add_node("non_existent_node", some_func)

# 方式 2:使用 Literal 类型检查
def good_condition(state) -> Literal["node_a", "node_b"]:
    return "node_a"  # IDE 会提示可用选项

错误 2:忘记添加所有可能路径的边

python
def condition(state) -> Literal["node_a", "node_b", "node_c"]:
    return "node_a"

builder.add_conditional_edges("source", condition)
builder.add_edge("node_a", END)
builder.add_edge("node_b", END)
# ❌ 忘记添加 node_c → END

# 如果条件返回 "node_c",会报错

调试技巧:

python
def debug_condition(state):
    result = condition(state)
    print(f"Routing from {current_node} to {result}")
    print(f"State: {state}")
    return result

builder.add_conditional_edges("source", debug_condition)

9. 性能优化

python
# ❌ 不好的实践:条件函数中调用 LLM
def expensive_condition(state):
    # 每次路由都调用 LLM,成本高
    intent = llm.invoke("Classify: " + state["message"])
    return intent

# ✅ 好的实践:在节点中完成分类
def classify_node(state):
    intent = llm.invoke("Classify: " + state["message"])
    return {"intent": intent}

def simple_condition(state):
    # 只读取已有状态,无额外开销
    return state["intent"]

builder.add_node("classify", classify_node)
builder.add_conditional_edges("classify", simple_condition)

10. 与普通边的对比

特性普通边条件边
路由方式固定动态
添加方法add_edge(A, B)add_conditional_edges(A, func)
使用场景线性流程分支逻辑
复杂度简单中等
示例START → node1 → ENDnode1 → [条件判断] → node2/node3

关键要点

  • 条件边 = 动态路由机制
  • 条件函数:输入状态 → 输出节点名
  • tools_condition:检查是否需要调用工具
  • 使用 Literal 类型提示提高安全性
  • 条件逻辑应简单快速,避免复杂计算


问题 6: 如何实现一个带多个工具的 Router?

💡 点击查看答案

答案:

实现多工具 Router 的关键是 正确绑定所有工具使用 ToolNode 自动路由

1. 完整代码示例

python
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition

# 定义多个工具
@tool
def add(a: int, b: int) -> int:
    """Add two numbers together."""
    return a + b

@tool
def multiply(a: int, b: int) -> int:
    """Multiply two numbers."""
    return a * b

@tool
def divide(a: float, b: float) -> float:
    """Divide a by b."""
    if b == 0:
        return "Error: Division by zero"
    return a / b

@tool
def search_web(query: str) -> str:
    """Search the web for information."""
    # 实际实现中调用搜索 API
    return f"Search results for: {query}"

# 收集所有工具
tools = [add, multiply, divide, search_web]

# 创建 LLM 并绑定工具
llm = ChatOpenAI(model="gpt-4")
llm_with_tools = llm.bind_tools(tools)

# 定义节点
def call_model(state: MessagesState):
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

# 构建图
builder = StateGraph(MessagesState)
builder.add_node("llm", call_model)
builder.add_node("tools", ToolNode(tools))  # ← 传入所有工具

builder.add_edge(START, "llm")
builder.add_conditional_edges("llm", tools_condition)
builder.add_edge("tools", END)

graph = builder.compile()

2. 关键技术点

技术点 1:@tool 装饰器

python
from langchain_core.tools import tool

@tool
def my_function(param: type) -> return_type:
    """Clear description of what this tool does.

    Args:
        param: Description of parameter
    """
    return result

自动转换为:

  • 工具名称:函数名(my_function
  • 工具描述:Docstring 第一行
  • 参数模式:从类型注解生成
  • 调用接口:保持原函数逻辑

技术点 2:ToolNode 自动路由

python
ToolNode(tools)
# 内部实现(简化):
class ToolNode:
    def __init__(self, tools):
        self.tools_by_name = {t.name: t for t in tools}

    def __call__(self, state):
        last_message = state["messages"][-1]
        results = []

        for tool_call in last_message.tool_calls:
            tool = self.tools_by_name[tool_call["name"]]  # 自动匹配
            result = tool.invoke(tool_call["args"])
            results.append(ToolMessage(
                content=str(result),
                tool_call_id=tool_call["id"]
            ))

        return {"messages": results}

3. 测试不同工具

python
from langchain_core.messages import HumanMessage

# 测试 1:数学计算
result1 = graph.invoke({
    "messages": [HumanMessage("What is 15 + 23?")]
})
# LLM 选择 add(15, 23) → 返回 38

# 测试 2:乘法
result2 = graph.invoke({
    "messages": [HumanMessage("Multiply 7 by 8")]
})
# LLM 选择 multiply(7, 8) → 返回 56

# 测试 3:网络搜索
result3 = graph.invoke({
    "messages": [HumanMessage("Search for Python tutorials")]
})
# LLM 选择 search_web("Python tutorials")

# 测试 4:组合查询
result4 = graph.invoke({
    "messages": [HumanMessage("Calculate 100 / 4")]
})
# LLM 选择 divide(100, 4) → 返回 25.0

4. LLM 如何选择工具?

LLM 接收到的工具信息:

json
{
  "tools": [
    {
      "name": "add",
      "description": "Add two numbers together.",
      "parameters": {
        "type": "object",
        "properties": {
          "a": {"type": "integer"},
          "b": {"type": "integer"}
        }
      }
    },
    {
      "name": "multiply",
      "description": "Multiply two numbers.",
      "parameters": {
        "type": "object",
        "properties": {
          "a": {"type": "integer"},
          "b": {"type": "integer"}
        }
      }
    },
    ...
  ]
}

LLM 决策过程:

用户输入: "What is 15 + 23?"

LLM 分析: "这是加法问题"

查找工具: "add" 的描述匹配

提取参数: a=15, b=23

返回: tool_calls=[{"name": "add", "args": {"a": 15, "b": 23}}]

5. 错误处理

python
@tool
def divide(a: float, b: float) -> float:
    """Divide a by b. Returns error message if b is zero."""
    try:
        if b == 0:
            return "Error: Cannot divide by zero"
        return a / b
    except Exception as e:
        return f"Error: {str(e)}"

# 测试错误情况
result = graph.invoke({
    "messages": [HumanMessage("What is 10 divided by 0?")]
})
# 工具返回: "Error: Cannot divide by zero"
# LLM 收到错误信息,可以给出友好回复

6. 工具分类管理

对于大量工具,可以分类管理:

python
# 数学工具
math_tools = [add, multiply, divide]

# 搜索工具
search_tools = [search_web, search_database]

# 文件工具
file_tools = [read_file, write_file]

# 场景 1:数学助手(只绑定数学工具)
math_llm = llm.bind_tools(math_tools)

# 场景 2:通用助手(绑定所有工具)
general_llm = llm.bind_tools(math_tools + search_tools + file_tools)

7. 动态工具加载

python
def call_model_dynamic(state: MessagesState):
    # 根据用户消息动态选择工具
    last_message = state["messages"][-1].content.lower()

    if "calculate" in last_message or "math" in last_message:
        tools = math_tools
    elif "search" in last_message:
        tools = search_tools
    else:
        tools = math_tools + search_tools  # 默认全部

    llm_with_selected_tools = llm.bind_tools(tools)
    response = llm_with_selected_tools.invoke(state["messages"])
    return {"messages": [response]}

8. 工具调用统计

python
from collections import Counter

class ToolNodeWithStats(ToolNode):
    def __init__(self, tools):
        super().__init__(tools)
        self.call_stats = Counter()

    def __call__(self, state):
        last_message = state["messages"][-1]

        # 统计工具使用
        for tool_call in last_message.tool_calls:
            self.call_stats[tool_call["name"]] += 1

        # 执行工具
        result = super().__call__(state)

        # 打印统计
        print(f"Tool usage: {dict(self.call_stats)}")
        return result

# 使用
tool_node = ToolNodeWithStats(tools)
builder.add_node("tools", tool_node)

9. 最佳实践

python
# ✅ 好的工具定义
@tool
def calculate_discount(price: float, discount_percent: float) -> float:
    """Calculate the final price after applying a discount.

    Args:
        price: Original price in dollars
        discount_percent: Discount percentage (0-100)

    Returns:
        Final price after discount
    """
    if not (0 <= discount_percent <= 100):
        return "Error: Discount must be between 0 and 100"
    return price * (1 - discount_percent / 100)

# ❌ 不好的工具定义
@tool
def calc(p, d):  # 无类型注解
    """Calculate."""  # 描述不清楚
    return p * d  # 逻辑不明确

10. 完整示例:多功能助手

python
from langchain_core.tools import tool

# 工具集合
@tool
def get_weather(city: str) -> str:
    """Get current weather for a city."""
    return f"Weather in {city}: Sunny, 72°F"

@tool
def book_flight(origin: str, destination: str, date: str) -> str:
    """Book a flight ticket."""
    return f"Flight booked: {origin}{destination} on {date}"

@tool
def translate(text: str, target_lang: str) -> str:
    """Translate text to target language."""
    return f"[{target_lang}] {text}"

tools = [get_weather, book_flight, translate, add, multiply]

# 构建图
llm_with_tools = ChatOpenAI(model="gpt-4").bind_tools(tools)

def assistant(state: MessagesState):
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "assistant")
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", END)

graph = builder.compile()

# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))

# 测试多样化查询
queries = [
    "What's the weather in New York?",
    "Book a flight from LA to NYC on March 15",
    "Translate 'Hello' to Spanish",
    "What is 25 times 4?"
]

for query in queries:
    result = graph.invoke({"messages": [HumanMessage(query)]})
    print(f"Query: {query}")
    print(f"Response: {result['messages'][-1].content}\n")

关键要点

  • 多工具绑定: llm.bind_tools([tool1, tool2, ...])
  • ToolNode 自动路由: 根据 tool_calls 中的 name 匹配工具
  • 工具定义关键: 清晰的 Docstring + 类型注解
  • LLM 自动选择: 基于描述和参数匹配
  • 可扩展性: 轻松添加新工具,无需修改图结构

问题 7: 如何让 Agent 避免无限循环?

💡 点击查看答案

答案:

防止 Agent 无限循环需要 设置递归限制智能停止条件

1. 问题:为什么会出现无限循环?

Agent 的循环结构:

python
builder.add_edge("tools", "agent")  # 工具执行后回到 agent

# 流程:
# agent → tools → agent → tools → agent → ...

可能导致无限循环的场景:

  • LLM 一直认为需要调用工具
  • 工具返回的结果无法让 LLM 满意
  • LLM 陷入"思考-行动"死循环

2. 方法 1:使用 recursion_limit

python
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
graph = builder.compile(
    checkpointer=memory,
    recursion_limit=10  # ← 最多执行 10 次节点
)

# 测试
try:
    result = graph.invoke({"messages": [HumanMessage("test")]})
except RecursionError:
    print("达到递归限制!")

工作原理:

Step 1: agent
Step 2: tools
Step 3: agent
...
Step 10: agent
Step 11: ❌ RecursionError: 达到限制

3. 方法 2:在状态中计数

python
from typing import Annotated
import operator

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    iterations: Annotated[int, operator.add]  # 累加计数器

def agent(state: AgentState):
    # 检查迭代次数
    current_iterations = state.get("iterations", 0)

    if current_iterations >= 10:
        return {
            "messages": [AIMessage("已达到最大迭代次数,停止执行。")],
            "iterations": 0  # 重置计数器
        }

    # 正常执行
    response = llm_with_tools.invoke(state["messages"])
    return {
        "messages": [response],
        "iterations": 1  # 每次增加 1
    }

# 初始状态
initial_state = {
    "messages": [HumanMessage("Start")],
    "iterations": 0
}

result = graph.invoke(initial_state)

4. 方法 3:智能停止条件

python
def tools_condition_with_limit(state: AgentState) -> Literal["tools", "__end__"]:
    """增强版 tools_condition,包含多重停止条件"""

    # 条件 1:检查迭代次数
    if state.get("iterations", 0) >= 15:
        return "__end__"

    # 条件 2:检查消息数量(防止对话过长)
    if len(state["messages"]) > 50:
        return "__end__"

    # 条件 3:检查是否有工具调用
    last_message = state["messages"][-1]
    if not hasattr(last_message, "tool_calls") or not last_message.tool_calls:
        return "__end__"

    # 条件 4:检查是否重复调用相同工具
    if len(last_message.tool_calls) > 0:
        tool_name = last_message.tool_calls[0]["name"]
        # 检查最近 3 次是否都调用同一工具
        recent_calls = []
        for msg in state["messages"][-6:]:  # 最近 3 轮(每轮 2 条消息)
            if hasattr(msg, "tool_calls") and msg.tool_calls:
                recent_calls.append(msg.tool_calls[0]["name"])

        if recent_calls.count(tool_name) >= 3:
            print(f"警告:工具 {tool_name} 被重复调用,停止执行")
            return "__end__"

    return "tools"

# 使用增强版条件
builder.add_conditional_edges("agent", tools_condition_with_limit)

5. 方法 4:超时机制

python
import time
from datetime import datetime, timedelta

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    start_time: float

def agent_with_timeout(state: AgentState):
    # 记录开始时间
    if "start_time" not in state:
        state["start_time"] = time.time()

    # 检查超时(例如:最多运行 60 秒)
    elapsed = time.time() - state["start_time"]
    if elapsed > 60:
        return {
            "messages": [AIMessage("执行超时,已停止。")],
            "start_time": state["start_time"]
        }

    # 正常执行
    response = llm_with_tools.invoke(state["messages"])
    return {
        "messages": [response],
        "start_time": state["start_time"]
    }

6. 方法 5:工具调用成本限制

python
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    total_tokens: int

def agent_with_budget(state: AgentState):
    # 检查 token 使用量
    total_tokens = state.get("total_tokens", 0)
    max_tokens = 10000

    if total_tokens >= max_tokens:
        return {
            "messages": [AIMessage(f"已达到 token 限制 ({max_tokens}),停止执行。")],
            "total_tokens": total_tokens
        }

    # 调用 LLM
    response = llm_with_tools.invoke(state["messages"])

    # 统计 token
    tokens_used = response.response_metadata.get("token_usage", {}).get("total_tokens", 0)

    return {
        "messages": [response],
        "total_tokens": total_tokens + tokens_used
    }

7. 方法 6:LLM 提示词优化

python
from langchain_core.messages import SystemMessage

system_prompt = SystemMessage(content="""
You are a helpful assistant. Follow these rules:

1. **Efficiency**: Minimize the number of tool calls needed
2. **Completion**: Once you have enough information, provide a final answer
3. **No Loops**: If a tool fails twice, stop trying and explain the issue
4. **Token Awareness**: Keep responses concise

When you have completed the task, respond with a final answer WITHOUT tool calls.
""")

def agent_with_prompt(state: MessagesState):
    messages = [system_prompt] + state["messages"]
    response = llm_with_tools.invoke(messages)
    return {"messages": [response]}

8. 完整示例:综合防护

python
from typing import Annotated
import operator
import time

class RobustAgentState(TypedDict):
    messages: Annotated[list, add_messages]
    iterations: Annotated[int, operator.add]
    total_tokens: int
    start_time: float

def robust_agent(state: RobustAgentState):
    """带多重保护的 Agent"""

    # 初始化
    if "start_time" not in state:
        state["start_time"] = time.time()

    iterations = state.get("iterations", 0)
    total_tokens = state.get("total_tokens", 0)
    elapsed = time.time() - state["start_time"]

    # 保护 1:迭代次数
    if iterations >= 10:
        return {
            "messages": [AIMessage("达到最大迭代次数 (10)。")],
            "iterations": 0
        }

    # 保护 2:执行时间
    if elapsed > 120:  # 2 分钟
        return {
            "messages": [AIMessage("执行超时 (120s)。")],
            "iterations": 0
        }

    # 保护 3:Token 预算
    if total_tokens >= 50000:
        return {
            "messages": [AIMessage("达到 token 限制 (50000)。")],
            "iterations": 0
        }

    # 正常执行
    try:
        response = llm_with_tools.invoke(state["messages"])
        tokens_used = response.response_metadata.get("token_usage", {}).get("total_tokens", 0)

        return {
            "messages": [response],
            "iterations": 1,
            "total_tokens": tokens_used
        }
    except Exception as e:
        return {
            "messages": [AIMessage(f"执行错误:{str(e)}")],
            "iterations": 0
        }

def robust_tools_condition(state: RobustAgentState):
    """增强版停止条件"""
    last_message = state["messages"][-1]

    # 检查是否有工具调用
    if not hasattr(last_message, "tool_calls") or not last_message.tool_calls:
        return "__end__"

    # 检查保护条件
    if state.get("iterations", 0) >= 10:
        return "__end__"

    return "tools"

# 构建图
builder = StateGraph(RobustAgentState)
builder.add_node("agent", robust_agent)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", robust_tools_condition)
builder.add_edge("tools", "agent")

# 使用 recursion_limit 作为最后防线
graph = builder.compile(recursion_limit=25)

9. 监控和调试

python
def agent_with_logging(state: AgentState):
    iterations = state.get("iterations", 0)
    print(f"[Iteration {iterations}] Agent executing...")

    response = llm_with_tools.invoke(state["messages"])

    # 记录工具调用
    if hasattr(response, "tool_calls") and response.tool_calls:
        for tc in response.tool_calls:
            print(f"  → Calling tool: {tc['name']}")
    else:
        print(f"  → Generating final response")

    return {
        "messages": [response],
        "iterations": 1
    }

10. 最佳实践总结

方法优先级适用场景实现难度
recursion_limit必须所有 Agent简单
迭代计数推荐长时间运行的 Agent中等
超时机制推荐生产环境中等
Token 限制可选成本敏感场景中等
智能停止条件推荐复杂 Agent较难
提示词优化必须所有 Agent简单

推荐配置:

python
# 1. 提示词明确指示何时停止
# 2. recursion_limit=20(合理上限)
# 3. 状态中跟踪迭代次数
# 4. 条件函数检查异常模式
# 5. 生产环境添加超时和 token 限制

关键要点

  • 多层防护: 不要依赖单一机制
  • recursion_limit: 最基本的保护
  • 状态计数: 更精细的控制
  • 智能条件: 检测异常循环模式
  • 监控日志: 便于调试和优化

问题 8: 如何实现跨会话的对话记忆?

💡 点击查看答案

答案:

跨会话对话记忆通过 Checkpointer + Thread ID 实现状态持久化和恢复。

1. 核心概念

会话(Session): 用户与 Agent 的一次完整交互过程 跨会话记忆: 在多次独立会话之间保持用户信息和对话历史

会话 1 (今天上午)
用户: "我的订单号是 12345"
Agent: "好的,订单 12345 已记录"

会话 2 (今天下午,新的浏览器标签)
用户: "那个订单发货了吗?"
Agent: "您的订单 12345 已于今天上午发货"  ✅ 记住了订单号

2. 基础实现:MemorySaver

python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState

# 创建检查点器
memory = MemorySaver()

# 编译图
graph = builder.compile(checkpointer=memory)

# 会话 1
config = {"configurable": {"thread_id": "user-123"}}
result1 = graph.invoke(
    {"messages": [HumanMessage("我的订单号是 12345")]},
    config
)

# 会话 2(稍后,同一 thread_id)
result2 = graph.invoke(
    {"messages": [HumanMessage("那个订单发货了吗?")]},
    config
)
# ✅ 自动加载之前的对话历史

3. 生产实现:PostgreSQL 持久化

python
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg2

# 连接数据库
conn_string = "postgresql://user:password@localhost:5432/langgraph_db"
checkpointer = PostgresSaver.from_conn_string(conn_string)

# 编译图(与 MemorySaver 接口完全相同)
graph = builder.compile(checkpointer=checkpointer)

# 使用方式完全相同
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke({"messages": [...]}, config)

PostgreSQL 表结构:

sql
CREATE TABLE checkpoints (
    thread_id TEXT,
    checkpoint_id TEXT PRIMARY KEY,
    parent_id TEXT,
    created_at TIMESTAMP,
    state JSONB,  -- 完整状态数据
    metadata JSONB
);

CREATE INDEX idx_thread_id ON checkpoints(thread_id);

4. Thread ID 设计模式

模式 1:按用户隔离

python
def get_config(user_id: str):
    return {"configurable": {"thread_id": f"user-{user_id}"}}

# 用户 A
config_a = get_config("alice")
graph.invoke(input, config_a)

# 用户 B(完全独立)
config_b = get_config("bob")
graph.invoke(input, config_b)

模式 2:按会话隔离

python
import uuid

def create_new_session(user_id: str):
    session_id = str(uuid.uuid4())
    return {"configurable": {"thread_id": f"{user_id}-{session_id}"}}

# 用户开始新对话
config = create_new_session("alice")
# thread_id: "alice-4f8d3c2a-..."

模式 3:按功能隔离

python
def get_config(user_id: str, context: str):
    return {"configurable": {"thread_id": f"{user_id}-{context}"}}

# 客服对话
support_config = get_config("alice", "support")

# 订单查询
order_config = get_config("alice", "order")

# 两个独立的对话线程

5. 扩展状态:用户偏好记忆

python
class UserState(TypedDict):
    messages: Annotated[list, add_messages]
    user_id: str
    preferences: dict  # 用户偏好
    conversation_count: Annotated[int, operator.add]

def agent_with_preferences(state: UserState):
    # 读取用户偏好
    prefs = state.get("preferences", {})
    style = prefs.get("response_style", "formal")

    # 根据偏好调整系统提示
    if style == "casual":
        system_msg = "You are a friendly, casual assistant."
    else:
        system_msg = "You are a professional, formal assistant."

    messages = [SystemMessage(system_msg)] + state["messages"]
    response = llm.invoke(messages)

    return {
        "messages": [response],
        "conversation_count": 1
    }

# 会话 1:设置偏好
initial_state = {
    "messages": [HumanMessage("我喜欢简洁的回答")],
    "user_id": "alice",
    "preferences": {"response_style": "casual"},
    "conversation_count": 0
}

config = {"configurable": {"thread_id": "alice"}}
graph.invoke(initial_state, config)

# 会话 2:自动应用偏好
later_state = {
    "messages": [HumanMessage("解释量子计算")]
}
result = graph.invoke(later_state, config)
# ✅ 自动加载 preferences,使用 casual 风格

6. 时间限制的记忆

python
from datetime import datetime, timedelta

class TimedState(TypedDict):
    messages: Annotated[list, add_messages]
    last_active: str  # ISO 格式时间戳

def trim_old_messages(state: TimedState) -> TimedState:
    """删除超过 7 天的消息"""
    cutoff = datetime.now() - timedelta(days=7)

    # 解析 last_active
    if "last_active" in state:
        last_active = datetime.fromisoformat(state["last_active"])
        if last_active < cutoff:
            # 太久没活动,清空历史
            return {
                "messages": [],
                "last_active": datetime.now().isoformat()
            }

    # 更新活动时间
    return {
        **state,
        "last_active": datetime.now().isoformat()
    }

def agent(state: TimedState):
    # 先清理旧消息
    state = trim_old_messages(state)

    # 正常处理
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

7. 多设备同步

python
from fastapi import FastAPI, HTTPException
from langgraph_sdk import get_client

app = FastAPI()
client = get_client(url="https://your-langgraph-cloud.app")

@app.post("/chat")
async def chat(user_id: str, message: str, device_id: str):
    """支持多设备访问同一对话"""

    # 使用 user_id 作为 thread_id(不包含 device_id)
    config = {"configurable": {"thread_id": f"user-{user_id}"}}

    # 执行图
    response = await client.runs.create(
        thread_id=config["configurable"]["thread_id"],
        assistant_id="agent",
        input={"messages": [HumanMessage(message)]},
        metadata={"device_id": device_id}  # 记录设备信息
    )

    return {"response": response}

# 使用场景:
# 设备 1 (手机): POST /chat?user_id=alice&message=Hi&device_id=phone
# 设备 2 (电脑): POST /chat?user_id=alice&message=继续&device_id=laptop
# 两个设备看到相同的对话历史

8. 查询历史对话

python
def get_conversation_history(thread_id: str, limit: int = 10):
    """获取对话历史"""
    config = {"configurable": {"thread_id": thread_id}}

    # 获取所有检查点
    checkpoints = list(graph.checkpointer.list(config))

    # 按时间倒序
    checkpoints.sort(key=lambda x: x.created_at, reverse=True)

    # 提取消息
    history = []
    for cp in checkpoints[:limit]:
        messages = cp.state.get("messages", [])
        history.append({
            "timestamp": cp.created_at,
            "messages": [
                {"role": m.type, "content": m.content}
                for m in messages
            ]
        })

    return history

# 使用
history = get_conversation_history("user-alice")
for session in history:
    print(f"\n=== {session['timestamp']} ===")
    for msg in session['messages']:
        print(f"{msg['role']}: {msg['content']}")

9. 清理和过期管理

python
import asyncio
from datetime import datetime, timedelta

async def cleanup_old_threads(checkpointer, days: int = 30):
    """定期清理超过 N 天的对话"""
    cutoff = datetime.now() - timedelta(days=days)

    # 获取所有 thread
    all_threads = await checkpointer.list_threads()

    deleted_count = 0
    for thread_id in all_threads:
        config = {"configurable": {"thread_id": thread_id}}
        checkpoints = list(checkpointer.list(config))

        if not checkpoints:
            continue

        # 检查最后活动时间
        last_checkpoint = max(checkpoints, key=lambda x: x.created_at)
        if last_checkpoint.created_at < cutoff:
            checkpointer.delete_thread(config)
            deleted_count += 1
            print(f"Deleted thread: {thread_id}")

    print(f"Total deleted: {deleted_count} threads")

# 定期运行(例如每天凌晨)
async def scheduled_cleanup():
    while True:
        await cleanup_old_threads(checkpointer, days=30)
        await asyncio.sleep(86400)  # 24 小时

10. 完整示例:生产级记忆系统

python
from langgraph.checkpoint.postgres import PostgresSaver
from typing import Annotated
import operator
from datetime import datetime

class ProductionState(TypedDict):
    messages: Annotated[list, add_messages]
    user_id: str
    user_name: str
    preferences: dict
    session_count: Annotated[int, operator.add]
    last_active: str
    metadata: dict

# 数据库连接
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost/langgraph"
)

def production_agent(state: ProductionState):
    # 更新活动时间
    state["last_active"] = datetime.now().isoformat()

    # 个性化问候
    if state.get("session_count", 0) == 0:
        greeting = f"你好 {state.get('user_name', '用户')}!很高兴为你服务。"
    else:
        greeting = f"欢迎回来,{state.get('user_name')}!"

    # 构建消息
    messages = [SystemMessage(greeting)] + state["messages"]
    response = llm.invoke(messages)

    return {
        "messages": [response],
        "session_count": 1,
        "last_active": state["last_active"]
    }

# 构建图
builder = StateGraph(ProductionState)
builder.add_node("agent", production_agent)
builder.add_edge(START, "agent")
builder.add_edge("agent", END)

graph = builder.compile(checkpointer=checkpointer)

# API 端点
@app.post("/api/chat")
async def chat_endpoint(
    user_id: str,
    message: str,
    user_name: str = "用户"
):
    config = {"configurable": {"thread_id": f"user-{user_id}"}}

    # 调用图
    result = await graph.ainvoke(
        {
            "messages": [HumanMessage(message)],
            "user_id": user_id,
            "user_name": user_name,
            "preferences": {},  # 可以从数据库加载
            "session_count": 0,
            "metadata": {"endpoint": "/api/chat"}
        },
        config
    )

    return {
        "response": result["messages"][-1].content,
        "session_count": result.get("session_count", 0)
    }

关键要点

  • Checkpointer + Thread ID = 跨会话记忆
  • MemorySaver:开发测试
  • PostgresSaver:生产环境
  • Thread ID 设计:按用户/会话/功能隔离
  • 定期清理:避免无限增长

由于篇幅限制,让我继续创建剩余的问题 9-15...

问题 9: 如何在本地测试后部署到云端?

💡 点击查看答案

答案:

本地测试到云端部署的完整流程包括 本地开发 → GitHub 推送 → LangSmith 配置 → 云端部署

步骤 1:本地开发和测试

bash
# 项目结构
my-agent/
├── agent.py              # 图定义
├── langgraph.json        # 配置文件
├── .env                  # 环境变量(不提交)
├── .env.example          # 环境变量模板
├── .gitignore           # Git 忽略文件
└── requirements.txt      # Python 依赖

# 启动本地开发服务器
cd my-agent
langgraph dev

# 输出:
# 🚀 API: http://127.0.0.1:2024
# 🎨 Studio UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024

步骤 2:Git 准备

bash
# .gitignore 内容
.env
*.env
__pycache__/
.venv/

# 初始化 Git
git init
git add .
git commit -m "Initial commit: LangGraph agent"

# 创建 GitHub 仓库并推送
git remote add origin https://github.com/username/my-agent.git
git push -u origin main

步骤 3:LangSmith 部署配置

  1. 访问 LangSmith
  2. 导航到 DeploymentsNew Deployment
  3. 选择 Deploy from GitHub
  4. 配置:
    • Repository: username/my-agent
    • Branch: main
    • Config file: langgraph.json
  5. 设置环境变量(从 .env 复制)
  6. 点击 Deploy

步骤 4:使用 SDK 调用云端

python
from langgraph_sdk import get_client

# 本地测试
local_client = get_client(url="http://127.0.0.1:2024")

# 云端部署
cloud_client = get_client(url="https://your-deployment.langgraph.app")

# API 完全相同!
async for chunk in cloud_client.runs.stream(
    thread_id="user-123",
    assistant_id="agent",
    input={"messages": [...]},
    stream_mode="values"
):
    print(chunk)

关键要点

  • 本地测试: langgraph dev 快速迭代
  • Git 管理: 不要提交 .env 文件
  • 环境变量: 云端需手动配置
  • SDK 统一: 本地/云端 API 完全一致

问题 10: 如何处理工具调用失败的情况?

💡 点击查看答案

答案:

工具调用失败处理需要 异常捕获 + 错误消息返回 + 重试机制

方法 1:工具内部错误处理

python
from langchain_core.tools import tool

@tool
def divide(a: float, b: float) -> str:
    """Divide a by b with error handling."""
    try:
        if b == 0:
            return "Error: Division by zero is not allowed"
        result = a / b
        return f"Result: {result}"
    except Exception as e:
        return f"Error: {str(e)}"

方法 2:ToolNode 包装器

python
class SafeToolNode:
    def __init__(self, tools):
        self.tool_node = ToolNode(tools)

    def __call__(self, state: MessagesState):
        try:
            return self.tool_node(state)
        except Exception as e:
            # 返回错误消息给 LLM
            last_message = state["messages"][-1]
            error_msg = ToolMessage(
                content=f"Tool execution failed: {str(e)}",
                tool_call_id=last_message.tool_calls[0]["id"] if last_message.tool_calls else "error"
            )
            return {"messages": [error_msg]}

# 使用
builder.add_node("tools", SafeToolNode(tools))

方法 3:重试机制

python
from tenacity import retry, stop_after_attempt, wait_exponential

@tool
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def api_call(endpoint: str) -> str:
    """Call external API with retry."""
    response = requests.get(endpoint, timeout=10)
    response.raise_for_status()
    return response.text

方法 4:Agent 级别处理

python
def agent_with_error_recovery(state: MessagesState):
    last_message = state["messages"][-1]

    # 检查是否是错误消息
    if isinstance(last_message, ToolMessage) and "Error:" in last_message.content:
        # LLM 分析错误并决定下一步
        recovery_prompt = SystemMessage(
            "The tool call failed. Analyze the error and either retry with different parameters or provide an alternative solution."
        )
        messages = [recovery_prompt] + state["messages"]
        response = llm_with_tools.invoke(messages)
    else:
        response = llm_with_tools.invoke(state["messages"])

    return {"messages": [response]}

关键要点

  • 工具内部: try-except 捕获异常
  • 返回错误消息: 让 LLM 知道失败原因
  • 重试机制: 对临时失败自动重试
  • Agent 恢复: 分析错误并调整策略

问题 11: 设计一个客服机器人应该选择哪种架构模式?

💡 点击查看答案

答案:

客服机器人应选择 Agent + Memory 架构,结合 Router 进行意图分类。

推荐架构:分层设计

python
# 第 1 层:意图分类(Router)
def classify_intent(state) -> Literal["faq", "order_query", "complaint", "general"]:
    last_message = state["messages"][-1].content
    # 使用 LLM 分类或关键词匹配
    classification = llm.invoke(f"Classify intent: {last_message}")
    return classification

# 第 2 层:专业处理(Agent)
def order_agent(state):
    """处理订单相关查询(可能需要多步骤)"""
    tools = [check_order_status, track_shipment, cancel_order]
    llm_with_tools = llm.bind_tools(tools)
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

def faq_agent(state):
    """处理常见问题(简单查询)"""
    # 从知识库检索答案
    answer = search_knowledge_base(state["messages"][-1].content)
    return {"messages": [AIMessage(answer)]}

# 构建图
builder = StateGraph(MessagesState)
builder.add_node("classifier", lambda s: s)
builder.add_node("order_agent", order_agent)
builder.add_node("faq_agent", faq_agent)
builder.add_node("tools", ToolNode(tools))

# 路由逻辑
builder.add_edge(START, "classifier")
builder.add_conditional_edges("classifier", classify_intent)
builder.add_conditional_edges("order_agent", tools_condition)
builder.add_edge("tools", "order_agent")  # Agent 模式的循环
builder.add_edge("faq_agent", END)
builder.add_edge("order_agent", END)

# 添加 Memory
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
graph = builder.compile(checkpointer=checkpointer)

为什么这样设计?

  1. Router(意图分类): 快速分流简单问题
  2. Agent(订单处理): 处理需要多步骤的复杂查询
  3. Memory(对话记忆): 记住用户信息和上下文
  4. 分层处理: 平衡效率和能力

使用示例

python
config = {"configurable": {"thread_id": "customer-12345"}}

# 会话 1:订单查询
graph.invoke({"messages": [HumanMessage("我的订单 #98765 在哪里?")]}, config)
# → 路由到 order_agent → 调用 track_shipment → 返回物流信息

# 会话 2:后续问题(利用 Memory)
graph.invoke({"messages": [HumanMessage("那个订单可以取消吗?")]}, config)
# → 记住订单号 #98765 → 调用 cancel_order

关键要点

  • 分层架构: Router + Agent 结合
  • 工具分类: 不同场景使用不同工具集
  • Memory 必需: 客服需要记住上下文
  • 错误处理: 工具失败时提供人工转接选项

问题 12: 如何优化 Agent 的 Token 使用?

💡 点击查看答案

答案:

优化 Token 使用需要 消息修剪 + 提示词优化 + 工具选择策略

方法 1:限制消息历史长度

python
from langchain_core.messages import trim_messages

def agent_with_trim(state: MessagesState):
    # 只保留最近 10 条消息
    trimmed_messages = trim_messages(
        state["messages"],
        max_tokens=2000,
        strategy="last",
        token_counter=llm
    )

    response = llm_with_tools.invoke(trimmed_messages)
    return {"messages": [response]}

方法 2:消息摘要

python
def summarize_old_messages(state: MessagesState):
    messages = state["messages"]

    if len(messages) > 20:
        # 摘要前 10 条消息
        old_messages = messages[:10]
        summary = llm.invoke([
            SystemMessage("Summarize this conversation in 2-3 sentences:"),
            *old_messages
        ])

        # 保留摘要 + 最近消息
        new_messages = [
            SystemMessage(f"Previous conversation summary: {summary.content}"),
            *messages[10:]
        ]
        return {"messages": new_messages}

    return state

方法 3:减少系统提示长度

python
# ❌ 不好:冗长的系统提示
system_msg = SystemMessage(content="""
You are a highly intelligent, professional, and courteous AI assistant...
[500 words of instructions]
""")

# ✅ 好:简洁的系统提示
system_msg = SystemMessage(content="""
You are a helpful assistant. Be concise and accurate.
Use tools when needed. Provide final answers without tool calls.
""")

方法 4:智能工具选择

python
def agent_with_selective_tools(state: MessagesState):
    last_message = state["messages"][-1].content.lower()

    # 根据意图只绑定相关工具
    if "calculate" in last_message:
        tools = [add, multiply, divide]
    elif "search" in last_message:
        tools = [search_web]
    else:
        tools = [add, search_web]  # 最小工具集

    llm_with_tools = llm.bind_tools(tools)
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

方法 5:使用更便宜的模型

python
# 分级模型策略
def get_llm_for_task(task_complexity: str):
    if task_complexity == "simple":
        return ChatOpenAI(model="gpt-3.5-turbo")  # 便宜
    elif task_complexity == "complex":
        return ChatOpenAI(model="gpt-4")  # 强大
    else:
        return ChatOpenAI(model="gpt-4o-mini")  # 平衡

方法 6:Token 使用监控

python
class TokenTrackingState(TypedDict):
    messages: Annotated[list, add_messages]
    total_tokens: int
    cost_usd: float

def agent_with_tracking(state: TokenTrackingState):
    response = llm_with_tools.invoke(state["messages"])

    # 提取 token 使用
    usage = response.response_metadata.get("token_usage", {})
    tokens = usage.get("total_tokens", 0)

    # 计算成本(GPT-4 示例:$0.03/1K tokens)
    cost = (tokens / 1000) * 0.03

    return {
        "messages": [response],
        "total_tokens": tokens,
        "cost_usd": cost
    }

关键要点

  • 消息修剪: 限制历史长度
  • 摘要策略: 压缩旧对话
  • 提示词优化: 简洁明确
  • 选择性工具: 减少函数描述开销
  • 监控成本: 实时跟踪 token 使用

问题 13: 如何设计多用户并发的 Agent 系统?

💡 点击查看答案

答案:

多用户并发系统需要 Thread ID 隔离 + 数据库 Checkpointer + 异步处理

核心设计原则

python
# 每个用户独立的 Thread ID
def get_user_config(user_id: str, session_id: str = None):
    if session_id:
        thread_id = f"user-{user_id}-session-{session_id}"
    else:
        thread_id = f"user-{user_id}"

    return {"configurable": {"thread_id": thread_id}}

生产级架构:FastAPI + PostgreSQL

python
from fastapi import FastAPI, HTTPException
from langgraph_sdk import get_client
from langgraph.checkpoint.postgres import PostgresSaver
import asyncio

app = FastAPI()

# 数据库 Checkpointer(支持并发)
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost/langgraph"
)

# 编译图
graph = builder.compile(checkpointer=checkpointer)

@app.post("/chat")
async def chat_endpoint(user_id: str, message: str):
    """处理单个用户请求(异步)"""
    config = get_user_config(user_id)

    # 异步调用(非阻塞)
    result = await graph.ainvoke(
        {"messages": [HumanMessage(message)]},
        config
    )

    return {"response": result["messages"][-1].content}

@app.post("/chat/batch")
async def batch_chat(requests: list[dict]):
    """批量处理多个用户请求(并发)"""
    tasks = []

    for req in requests:
        config = get_user_config(req["user_id"])
        task = graph.ainvoke(
            {"messages": [HumanMessage(req["message"])]},
            config
        )
        tasks.append(task)

    # 并发执行
    results = await asyncio.gather(*tasks)

    return {"responses": [r["messages"][-1].content for r in results]}

负载均衡和扩展

python
# 使用 Redis 作为分布式锁
from redis import Redis
import hashlib

redis_client = Redis(host='localhost', port=6379, db=0)

def get_shard_for_user(user_id: str, num_shards: int = 4) -> int:
    """将用户分配到不同的处理分片"""
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    return hash_value % num_shards

@app.post("/chat/balanced")
async def balanced_chat(user_id: str, message: str):
    """负载均衡的聊天端点"""
    shard = get_shard_for_user(user_id)

    # 使用分布式锁防止并发写入
    lock_key = f"user_lock:{user_id}"

    with redis_client.lock(lock_key, timeout=30):
        config = get_user_config(user_id)
        result = await graph.ainvoke(
            {"messages": [HumanMessage(message)]},
            config
        )

    return {
        "response": result["messages"][-1].content,
        "shard": shard
    }

监控和限流

python
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter

@app.on_event("startup")
async def startup():
    await FastAPILimiter.init(redis_client)

@app.post("/chat", dependencies=[RateLimiter(times=10, seconds=60)])
async def rate_limited_chat(user_id: str, message: str):
    """每个用户每分钟最多 10 次请求"""
    config = get_user_config(user_id)
    result = await graph.ainvoke({"messages": [HumanMessage(message)]}, config)
    return {"response": result["messages"][-1].content}

关键要点

  • Thread ID 隔离: 每个用户独立状态
  • 异步处理: 使用 ainvokeasyncio
  • 数据库 Checkpointer: PostgreSQL 支持并发
  • 负载均衡: 用户分片和分布式锁
  • 限流保护: 防止滥用

问题 14: Simple Graph vs Chain vs Agent:如何选择?

💡 点击查看答案

答案:

选择架构模式基于 任务复杂度、决策需求、工具数量

决策树

你的任务需求:

是否需要 LLM?
├─ 否 → Simple Graph(纯逻辑流程)
└─ 是 ↓

是否需要调用工具?
├─ 否 → 简单 LLM 调用(不需要 LangGraph)
└─ 是 ↓

工具调用后是否需要 LLM 继续推理?
├─ 否 → Router / Chain(单次工具调用)
└─ 是 ↓

是否需要多轮工具调用?
├─ 否 → Router(一次决策)
└─ 是 → Agent(循环推理)

是否需要跨会话记忆?
├─ 否 → 无 Checkpointer
└─ 是 → Agent + Memory

详细对比表

场景Simple GraphChainRouterAgentAgent + Memory
Hello World 示例
单次工具调用可以可以
多次工具调用
需要上下文记忆
复杂度
成本

实际案例推荐

场景 1:简单问答

python
# 推荐:Simple Graph 或直接 LLM
response = llm.invoke("What is Python?")

场景 2:天气查询

python
# 推荐:Router
tools = [get_weather]
llm_with_tools = llm.bind_tools(tools)
# 一次调用工具,返回结果

场景 3:订单处理

python
# 推荐:Agent
# 可能需要:查询订单 → 检查库存 → 更新状态
tools = [check_order, check_inventory, update_order]
# 需要多步推理

场景 4:客服聊天机器人

python
# 推荐:Agent + Memory
# 需要记住用户信息和对话历史
checkpointer = PostgresSaver(...)
graph = builder.compile(checkpointer=checkpointer)

场景 5:数据分析助手

python
# 推荐:Agent + Memory
# 查询 → 分析 → 可视化(多步骤)
# 记住数据集和分析历史

关键要点

  • Simple Graph: 学习和简单流程
  • Chain/Router: 单次工具调用
  • Agent: 多步推理任务
  • Agent + Memory: 多轮对话系统

问题 15: 如何设计可扩展的工具系统?

💡 点击查看答案

答案:

可扩展工具系统需要 模块化设计 + 动态加载 + 工具注册表

设计 1:工具注册表模式

python
from typing import Dict, List
from langchain_core.tools import tool

class ToolRegistry:
    """工具注册中心"""

    def __init__(self):
        self.tools: Dict[str, callable] = {}
        self.categories: Dict[str, List[str]] = {}

    def register(self, category: str = "general"):
        """装饰器:注册工具"""
        def decorator(func):
            tool_func = tool(func)
            self.tools[func.__name__] = tool_func
            self.categories.setdefault(category, []).append(func.__name__)
            return tool_func
        return decorator

    def get_tools(self, category: str = None) -> List:
        """获取工具列表"""
        if category:
            tool_names = self.categories.get(category, [])
            return [self.tools[name] for name in tool_names]
        return list(self.tools.values())

# 全局注册表
registry = ToolRegistry()

# 注册工具
@registry.register(category="math")
def add(a: int, b: int) -> int:
    """Add two numbers."""
    return a + b

@registry.register(category="math")
def multiply(a: int, b: int) -> int:
    """Multiply two numbers."""
    return a * b

@registry.register(category="search")
def search_web(query: str) -> str:
    """Search the web."""
    return f"Results for: {query}"

# 使用
math_tools = registry.get_tools("math")
all_tools = registry.get_tools()

设计 2:插件化工具加载

python
import importlib
import os
from pathlib import Path

class ToolLoader:
    """动态加载工具插件"""

    def __init__(self, plugin_dir: str = "plugins"):
        self.plugin_dir = Path(plugin_dir)
        self.tools = []

    def load_plugins(self):
        """从插件目录加载所有工具"""
        for file in self.plugin_dir.glob("*.py"):
            if file.stem.startswith("_"):
                continue

            # 动态导入模块
            module_name = f"plugins.{file.stem}"
            module = importlib.import_module(module_name)

            # 查找所有 @tool 装饰的函数
            for attr_name in dir(module):
                attr = getattr(module, attr_name)
                if callable(attr) and hasattr(attr, "name"):
                    self.tools.append(attr)

        return self.tools

# 项目结构
# plugins/
# ├── math_tools.py
# ├── search_tools.py
# └── file_tools.py

# 使用
loader = ToolLoader("plugins")
tools = loader.load_plugins()

设计 3:条件工具加载

python
def get_tools_for_context(context: str, user_role: str) -> List:
    """根据上下文和用户角色返回工具"""

    base_tools = [search_web, calculate]

    if context == "customer_service":
        tools = base_tools + [check_order, track_shipment]

        if user_role == "admin":
            tools.append(cancel_order)  # 只有管理员可以取消订单

    elif context == "data_analysis":
        tools = base_tools + [query_database, generate_chart]

    else:
        tools = base_tools

    return tools

# 使用
def agent_with_dynamic_tools(state: MessagesState):
    context = state.get("context", "general")
    user_role = state.get("user_role", "user")

    tools = get_tools_for_context(context, user_role)
    llm_with_tools = llm.bind_tools(tools)

    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

设计 4:工具版本管理

python
class VersionedTool:
    """支持版本管理的工具包装器"""

    def __init__(self, name: str, func: callable, version: str):
        self.name = name
        self.func = tool(func)
        self.version = version

    def __call__(self, *args, **kwargs):
        return self.func(*args, **kwargs)

class ToolManager:
    def __init__(self):
        self.tools: Dict[str, Dict[str, VersionedTool]] = {}

    def register(self, name: str, func: callable, version: str = "1.0"):
        if name not in self.tools:
            self.tools[name] = {}
        self.tools[name][version] = VersionedTool(name, func, version)

    def get_tool(self, name: str, version: str = "latest"):
        if version == "latest":
            version = max(self.tools[name].keys())
        return self.tools[name][version].func

# 使用
manager = ToolManager()
manager.register("search", search_web_v1, "1.0")
manager.register("search", search_web_v2, "2.0")

# 获取最新版本
tool = manager.get_tool("search", "latest")

设计 5:工具组合

python
from langchain_core.tools import tool

def create_tool_pipeline(*tools):
    """将多个工具组合成一个工具"""

    @tool
    def pipeline_tool(input_data: str) -> str:
        """Execute a pipeline of tools."""
        result = input_data
        for t in tools:
            result = t.invoke(result)
        return result

    return pipeline_tool

# 使用
fetch_tool = create_fetch_tool()
parse_tool = create_parse_tool()
summarize_tool = create_summarize_tool()

# 组合工具
research_tool = create_tool_pipeline(fetch_tool, parse_tool, summarize_tool)

关键要点

  • 注册表模式: 中心化管理所有工具
  • 插件系统: 动态加载和卸载
  • 条件加载: 根据上下文选择工具
  • 版本管理: 支持工具演进
  • 工具组合: 复杂功能的模块化

🎉 复习完成!

恭喜你完成了 Module-2 的全部 15 个复习问题!让我们回顾一下你已经掌握的知识:

📊 知识掌握度自测

类别问题编号掌握程度
基础理解1-5⬜⬜⬜⬜⬜
代码实现6-10⬜⬜⬜⬜⬜
架构设计11-15⬜⬜⬜⬜⬜

自测方法:

  • ⬜ = 不理解
  • ☑ = 基本理解
  • ✅ = 完全掌握

🎯 下一步学习路径

完成 Module-2 后,你已经掌握了:

  • ✅ LangGraph 的六大核心架构模式
  • ✅ 状态管理和 Reducer 机制
  • ✅ Router 和 Agent 的区别与应用
  • ✅ 工具系统的设计与实现
  • ✅ 记忆管理和状态持久化
  • ✅ 生产环境部署流程

继续前进:

Module-3:高级状态管理

  • 多状态模式(Multiple Schemas)
  • 消息过滤和修剪
  • 外部记忆系统集成

Module-4:人机协作

  • 断点(Breakpoints)机制
  • 状态编辑和人工反馈
  • 时间旅行调试

Module-5:并行和子图

  • Map-Reduce 模式
  • 并行节点执行
  • 子图复用

💪 实践建议

  1. 动手编码: 每个问题都尝试自己实现一遍
  2. 修改实验: 改变参数,观察行为变化
  3. 构建项目: 选择一个实际场景(如客服机器人)完整实现
  4. 阅读源码: 深入 LangGraph 源代码理解内部实现
  5. 社区交流: 在 LangChain Discord 或 GitHub 讨论问题

📚 参考资源


最后寄语

架构模式不是死记硬背的公式,而是解决问题的工具箱。真正的精通来自于:

  • 理解每种模式的本质(为什么这样设计?)
  • 认识每种模式的边界(什么场景不适用?)
  • 掌握模式间的权衡(如何选择和组合?)

继续保持好奇心,勇于实践,你一定能成为 LangGraph 的专家!

—— 来自你的图灵奖获得者朋友


🎊 Module-2 复习完成!共计约 15,000 字深度问答,覆盖所有核心概念。

基于 MIT 许可证发布。内容版权归作者所有。