Skip to content

5.1 Memory Agent - 详细解读

一、概述

1.1 本节简介

本节是 LangChain Academy Module-5 的第一部分,主要内容是构建一个带有长期记忆的智能 Agent。这个 Agent 被命名为 task_mAIstro,它的主要功能是帮助用户管理待办事项(ToDo List)。

与之前构建的简单聊天机器人不同,task_mAIstro 具备以下特点:

  1. 智能决策:可以自主决定何时保存记忆,而不是每次对话都保存
  2. 多类型记忆:可以管理三种不同类型的记忆:
    • 用户个人资料(Profile)- 语义记忆
    • 待办事项列表(ToDo Collection)- 语义记忆
    • 用户偏好设置(Instructions)- 程序性记忆
  3. 跨会话记忆:记忆可以在不同的对话会话中持久保存和访问

1.2 学习目标

通过学习本节内容,你将掌握:

  1. 如何使用 Trustcall 库来管理和更新结构化数据
  2. 如何监控和追踪 Trustcall 的工具调用过程
  3. 如何构建一个 ReAct 架构的 Agent
  4. 如何在 LangGraph 中实现短期记忆长期记忆
  5. 如何使用 条件边(Conditional Edges) 实现智能路由
  6. Pydantic 模型的高级用法

1.3 与前面内容的联系

在 Module-5 之前的内容中,我们学习了:

  • 如何创建保存单一类型记忆(Profile 或 Collection)的聊天机器人
  • 如何使用 Trustcall 来更新结构化数据模式

本节将这些知识整合起来,构建一个更加完整和实用的 Agent 系统。


二、核心概念介绍

2.1 什么是 Memory Agent(记忆 Agent)

Agent(智能体) 是一个可以自主决策和执行任务的程序。它通常包含以下能力:

  1. 感知:接收用户输入和环境信息
  2. 推理:分析信息并做出决策
  3. 行动:调用工具执行具体任务
  4. 记忆:保存重要信息供未来使用

Memory Agent 就是具备记忆能力的 Agent,它可以:

  • 记住用户的个人信息
  • 记住历史对话内容
  • 记住用户的偏好设置
  • 在不同会话中访问这些记忆

2.2 三种记忆类型

在认知科学中,人类的记忆可以分为不同类型。本节实现了其中两种:

2.2.1 语义记忆(Semantic Memory)

语义记忆是关于事实和知识的记忆。在本例中包括:

用户 Profile(个人资料)

  • 姓名、居住地、职业
  • 家庭成员信息
  • 兴趣爱好

ToDo Collection(待办事项集合)

  • 每个任务的具体内容
  • 完成时间估计
  • 截止日期
  • 具体的解决方案
  • 任务状态

2.2.2 程序性记忆(Procedural Memory)

程序性记忆是关于如何做某事的记忆。在本例中是:

Instructions(指令/偏好)

  • 如何创建待办事项
  • 用户希望以什么方式组织任务
  • 特殊要求(例如:包含本地商家信息)

2.2.3 短期记忆 vs 长期记忆

短期记忆(Within-thread Memory)

  • 单次会话中保持的记忆
  • 使用 MemorySaver(Checkpointer)实现
  • 保存对话历史
  • 会话结束后可能丢失

长期记忆(Across-thread Memory)

  • 多次会话间持久保存的记忆
  • 使用 InMemoryStore(Store)实现
  • 保存 Profile、ToDo、Instructions
  • 可以跨会话访问
┌─────────────────────────────────────────┐
│           Memory Agent                   │
│                                          │
│  ┌────────────────────────────────────┐ │
│  │   短期记忆 (MemorySaver)           │ │
│  │   - 对话历史                       │ │
│  │   - 当前会话状态                   │ │
│  └────────────────────────────────────┘ │
│                                          │
│  ┌────────────────────────────────────┐ │
│  │   长期记忆 (InMemoryStore)         │ │
│  │   ┌──────────────────────────────┐ │ │
│  │   │ Profile (语义记忆)           │ │ │
│  │   └──────────────────────────────┘ │ │
│  │   ┌──────────────────────────────┐ │ │
│  │   │ ToDo Collection (语义记忆)   │ │ │
│  │   └──────────────────────────────┘ │ │
│  │   ┌──────────────────────────────┐ │ │
│  │   │ Instructions (程序性记忆)    │ │ │
│  │   └──────────────────────────────┘ │ │
│  └────────────────────────────────────┘ │
└─────────────────────────────────────────┘

2.3 Trustcall 库

2.3.1 什么是 Trustcall

Trustcall 是一个用于从非结构化对话中提取结构化数据的库。它可以:

  1. 定义数据模式(Schema)使用 Pydantic
  2. 从对话中提取符合模式的信息
  3. 自动更新现有的数据记录
  4. 处理验证错误并自我修正

2.3.2 为什么需要 Trustcall

假设用户说:"我住在旧金山,和妻子还有一岁的女儿一起生活。"

我们需要把这段话转换成结构化数据:

python
{
    "name": "Lance",
    "location": "旧金山",
    "connections": ["妻子", "一岁的女儿"]
}

Trustcall 可以自动完成这个转换,并确保数据符合我们定义的模式。

2.3.3 Trustcall 的核心功能

  1. 创建新记录:提取信息并创建新的数据对象
  2. 更新现有记录:使用 JSON Patch 更新已有数据
  3. 自我修正:当数据验证失败时自动修正
  4. 并行处理:可以同时创建多个记录

三、环境准备和依赖安装

3.1 安装依赖包

python
%%capture --no-stderr
%pip install -U langchain_openai langgraph trustcall langchain_core

依赖包说明

  • langchain_openai:OpenAI 模型的 LangChain 集成
  • langgraph:构建有状态的多 Actor 应用的框架
  • trustcall:结构化数据提取库
  • langchain_core:LangChain 核心组件

3.2 配置环境变量

python
import os, getpass

def _set_env(var: str):
    # 检查环境变量是否已设置
    env_value = os.environ.get(var)
    if not env_value:
        # 如果未设置,提示用户输入
        env_value = getpass.getpass(f"{var}: ")

    # 为当前进程设置环境变量
    os.environ[var] = env_value

# 配置 LangSmith(用于追踪和调试)
_set_env("LANGSMITH_API_KEY")
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "langchain-academy"

# 配置 OpenAI
_set_env("OPENAI_API_KEY")

Python 知识点

  1. getpass.getpass():安全地获取密码输入,输入内容不会显示在屏幕上
  2. os.environ.get():获取环境变量,如果不存在返回 None
  3. os.environ[key] = value:设置环境变量

四、Trustcall 深入理解

4.1 定义数据模式

首先,我们需要使用 Pydantic 定义数据结构:

python
from pydantic import BaseModel, Field

class Memory(BaseModel):
    content: str = Field(
        description="The main content of the memory. For example: User expressed interest in learning about French."
    )

class MemoryCollection(BaseModel):
    memories: list[Memory] = Field(
        description="A list of memories about the user."
    )

Python 知识点

  1. BaseModel:Pydantic 的基类,用于定义数据模型
  2. Field:定义字段的元数据,包括描述、默认值、验证规则等
  3. 类型注解
    • str:字符串类型
    • list[Memory]:Memory 对象的列表

Pydantic 的作用

  • 数据验证:确保数据类型正确
  • 数据序列化:转换为 JSON 等格式
  • 自动文档:生成 API 文档
  • IDE 支持:提供代码补全和类型检查

4.2 监控 Trustcall 的工具调用(Spy 类)

Trustcall 在内部会调用一些工具来处理数据,我们可能想知道它具体做了什么。为此,我们创建一个"间谍"类来监控:

python
from trustcall import create_extractor
from langchain_openai import ChatOpenAI

# 监控 Trustcall 的工具调用
class Spy:
    def __init__(self):
        self.called_tools = []

    def __call__(self, run):
        # 收集提取器执行的工具调用信息
        q = [run]
        while q:
            r = q.pop()
            if r.child_runs:
                q.extend(r.child_runs)
            if r.run_type == "chat_model":
                self.called_tools.append(
                    r.outputs["generations"][0][0]["message"]["kwargs"]["tool_calls"]
                )

Python 知识点

  1. __init__ 方法:类的构造函数,初始化对象时自动调用
  2. __call__ 方法:让对象可以像函数一样被调用
    python
    spy = Spy()
    spy(some_data)  # 等同于调用 spy.__call__(some_data)
  3. 队列遍历:使用列表模拟队列进行广度优先搜索
    • q.pop():移除并返回列表的最后一个元素
    • q.extend(list):将列表的所有元素添加到队列中

代码逻辑解释

这个 Spy 类的工作原理:

  1. 初始化一个空列表 called_tools 来存储工具调用信息
  2. 当被调用时,遍历所有运行记录(包括子运行)
  3. 找到所有 chat_model 类型的运行
  4. 提取其中的工具调用(tool_calls)信息

4.3 创建 Trustcall 提取器

python
# 初始化 Spy
spy = Spy()

# 初始化模型
model = ChatOpenAI(model="gpt-4o", temperature=0)

# 创建提取器
trustcall_extractor = create_extractor(
    model,
    tools=[Memory],
    tool_choice="Memory",
    enable_inserts=True,
)

# 添加 Spy 作为监听器
trustcall_extractor_see_all_tool_calls = trustcall_extractor.with_listeners(
    on_end=spy
)

参数说明

  • model:使用的语言模型
  • tools=[Memory]:可用的数据模式(工具)
  • tool_choice="Memory":强制使用 Memory 工具
  • enable_inserts=True:允许插入新记录
  • with_listeners(on_end=spy):添加监听器,在执行结束时调用 spy

4.4 提取记忆 - 基础示例

python
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage

# 指令
instruction = """Extract memories from the following conversation:"""

# 对话内容
conversation = [
    HumanMessage(content="Hi, I'm Lance."),
    AIMessage(content="Nice to meet you, Lance."),
    HumanMessage(content="This morning I had a nice bike ride in San Francisco.")
]

# 调用提取器
result = trustcall_extractor.invoke({
    "messages": [SystemMessage(content=instruction)] + conversation
})

LangChain 消息类型

  1. HumanMessage:用户的消息
  2. AIMessage:AI 的回复
  3. SystemMessage:系统指令(不显示给用户)

4.5 查看提取结果

python
# 消息包含工具调用
for m in result["messages"]:
    m.pretty_print()

输出

================================== Ai Message ==================================
Tool Calls:
  Memory (call_NkjwwJGjrgxHzTb7KwD8lTaH)
 Call ID: call_NkjwwJGjrgxHzTb7KwD8lTaH
  Args:
    content: Lance had a nice bike ride in San Francisco this morning.
python
# 响应包含符合模式的记忆
for m in result["responses"]:
    print(m)

输出

content='Lance had a nice bike ride in San Francisco this morning.'
python
# 元数据包含工具调用 ID
for m in result["response_metadata"]:
    print(m)

输出

{'id': 'call_NkjwwJGjrgxHzTb7KwD8lTaH'}

结果结构说明

  • messages:包含 AI 的工具调用消息
  • responses:解析后的 Pydantic 对象列表
  • response_metadata:每个响应的元数据(包括 ID)

4.6 更新现有记忆

现在让我们看看如何更新已有的记忆:

python
# 更新对话
updated_conversation = [
    AIMessage(content="That's great, did you do after?"),
    HumanMessage(content="I went to Tartine and ate a croissant."),
    AIMessage(content="What else is on your mind?"),
    HumanMessage(content="I was thinking about my Japan, and going back this winter!"),
]

# 更新指令
system_msg = """Update existing memories and create new ones based on the following conversation:"""

# 保存现有记忆,给它们分配 ID、key(工具名)和 value
tool_name = "Memory"
existing_memories = [
    (str(i), tool_name, memory.model_dump())
    for i, memory in enumerate(result["responses"])
] if result["responses"] else None

print(existing_memories)

输出

python
[('0',
  'Memory',
  {'content': 'Lance had a nice bike ride in San Francisco this morning.'})]

Python 知识点

  1. 列表推导式[expression for item in iterable if condition]

    python
    # 等价于:
    existing_memories = []
    if result["responses"]:
        for i, memory in enumerate(result["responses"]):
            existing_memories.append((str(i), tool_name, memory.model_dump()))
    else:
        existing_memories = None
  2. enumerate():同时获取索引和值

    python
    for i, item in enumerate(['a', 'b', 'c']):
        print(i, item)
    # 输出:
    # 0 a
    # 1 b
    # 2 c
  3. model_dump():将 Pydantic 模型转换为字典

  4. 元组(id, tool_name, value) 是一个包含三个元素的元组

  5. 三元表达式value if condition else other_value

4.7 使用 Spy 监控更新过程

python
# 调用提取器,传入更新的对话和现有记忆
result = trustcall_extractor_see_all_tool_calls.invoke({
    "messages": updated_conversation,
    "existing": existing_memories
})

查看工具调用:

python
# 查看消息中的工具调用
for m in result["messages"]:
    m.pretty_print()

输出

================================== Ai Message ==================================
Tool Calls:
  Memory (call_bF0w0hE4YZmGyDbuJVe1mh5H)
 Call ID: call_bF0w0hE4YZmGyDbuJVe1mh5H
  Args:
    content: Lance had a nice bike ride in San Francisco this morning. Afterward, he went to Tartine and ate a croissant. He was also thinking about his trip to Japan and going back this winter.
  Memory (call_fQAxxRypV914Xev6nJ9VKw3X)
 Call ID: call_fQAxxRypV914Xev6nJ9VKw3X
  Args:
    content: Lance went to Tartine and ate a croissant. He was also thinking about his trip to Japan and going back this winter.

注意:第一个工具调用更新了现有记忆(ID 为 0 的记忆),第二个创建了新记忆。

查看 Spy 捕获的工具调用:

python
spy.called_tools

输出

python
[[{'name': 'PatchDoc',
   'args': {
       'json_doc_id': '0',
       'planned_edits': '1. Replace the existing content with the updated memory...',
       'patches': [{
           'op': 'replace',
           'path': '/content',
           'value': 'Lance had a nice bike ride in San Francisco this morning. Afterward, he went to Tartine and ate a croissant. He was also thinking about his trip to Japan and going back this winter.'
       }]
   },
   'id': 'call_bF0w0hE4YZmGyDbuJVe1mh5H',
   'type': 'tool_call'},
  {'name': 'Memory',
   'args': {
       'content': 'Lance went to Tartine and ate a croissant. He was also thinking about his trip to Japan and going back this winter.'
   },
   'id': 'call_fQAxxRypV914Xev6nJ9VKw3X',
   'type': 'tool_call'}]]

关键发现

  1. PatchDoc 工具:Trustcall 内部使用的工具,用于更新现有文档

    • json_doc_id: '0':指定要更新的文档 ID
    • planned_edits:描述计划的编辑内容
    • patches:JSON Patch 操作(RFC 6902 标准)
      • op: 'replace':替换操作
      • path: '/content':要修改的字段路径
      • value:新值
  2. Memory 工具:创建新记忆

4.8 提取工具信息的辅助函数

为了更方便地查看 Trustcall 的操作,我们创建一个辅助函数:

python
def extract_tool_info(tool_calls, schema_name="Memory"):
    """从工具调用中提取信息,包括补丁和新记忆

    Args:
        tool_calls: 模型的工具调用列表
        schema_name: 模式工具的名称(例如 "Memory"、"ToDo"、"Profile")
    """

    # 初始化变更列表
    changes = []

    for call_group in tool_calls:
        for call in call_group:
            if call['name'] == 'PatchDoc':
                changes.append({
                    'type': 'update',
                    'doc_id': call['args']['json_doc_id'],
                    'planned_edits': call['args']['planned_edits'],
                    'value': call['args']['patches'][0]['value']
                })
            elif call['name'] == schema_name:
                changes.append({
                    'type': 'new',
                    'value': call['args']
                })

    # 将结果格式化为单个字符串
    result_parts = []
    for change in changes:
        if change['type'] == 'update':
            result_parts.append(
                f"Document {change['doc_id']} updated:\n"
                f"Plan: {change['planned_edits']}\n"
                f"Added content: {change['value']}"
            )
        else:
            result_parts.append(
                f"New {schema_name} created:\n"
                f"Content: {change['value']}"
            )

    return "\n\n".join(result_parts)

# 使用辅助函数查看变更
schema_name = "Memory"
changes = extract_tool_info(spy.called_tools, schema_name)
print(changes)

输出

Document 0 updated:
Plan: 1. Replace the existing content with the updated memory that includes the new activities: going to Tartine for a croissant and thinking about going back to Japan this winter.
Added content: Lance had a nice bike ride in San Francisco this morning. Afterward, he went to Tartine and ate a croissant. He was also thinking about his trip to Japan and going back this winter.

New Memory created:
Content: {'content': 'Lance went to Tartine and ate a croissant. He was also thinking about his trip to Japan and going back this winter.'}

Python 知识点

  1. 函数参数默认值schema_name="Memory"

    • 调用时可以不提供该参数,使用默认值
    • 例如:extract_tool_info(calls) 等同于 extract_tool_info(calls, "Memory")
  2. 字符串的 join() 方法

    python
    "\n\n".join(['a', 'b', 'c'])  # 返回 "a\n\nb\n\nc"
  3. f-string(格式化字符串)

    python
    name = "Alice"
    age = 30
    print(f"Name: {name}, Age: {age}")  # Name: Alice, Age: 30

五、构建 task_mAIstro Agent

现在我们已经理解了 Trustcall 的工作原理,让我们开始构建完整的 Agent。

5.1 定义更新类型

首先定义 Agent 可以更新哪些类型的记忆:

python
from typing import TypedDict, Literal

# 更新记忆工具
class UpdateMemory(TypedDict):
    """ 决定更新哪种记忆类型 """
    update_type: Literal['user', 'todo', 'instructions']

Python 知识点

  1. TypedDict:创建具有特定字段的字典类型

    python
    # UpdateMemory 是一个字典,必须包含 update_type 键
    update = UpdateMemory(update_type='user')
  2. Literal:限制值只能是指定的几个选项之一

    python
    Literal['user', 'todo', 'instructions']
    # update_type 只能是这三个值之一

5.2 定义数据模式

5.2.1 用户 Profile 模式

python
from typing import Optional
from pydantic import BaseModel, Field

class Profile(BaseModel):
    """这是你正在聊天的用户的个人资料"""
    name: Optional[str] = Field(
        description="The user's name",
        default=None
    )
    location: Optional[str] = Field(
        description="The user's location",
        default=None
    )
    job: Optional[str] = Field(
        description="The user's job",
        default=None
    )
    connections: list[str] = Field(
        description="Personal connection of the user, such as family members, friends, or coworkers",
        default_factory=list
    )
    interests: list[str] = Field(
        description="Interests that the user has",
        default_factory=list
    )

Python 知识点

  1. Optional[str]:表示该字段可以是字符串或 None

    python
    # 等价于:
    from typing import Union
    name: Union[str, None]
  2. default=None:字段的默认值为 None

  3. default_factory=list:使用工厂函数创建默认值

    python
    # 每个实例都会创建一个新的空列表
    # 不要使用 default=[],会导致所有实例共享同一个列表

5.2.2 ToDo 模式

python
from datetime import datetime
from typing import Literal

class ToDo(BaseModel):
    task: str = Field(
        description="The task to be completed."
    )
    time_to_complete: Optional[int] = Field(
        description="Estimated time to complete the task (minutes)."
    )
    deadline: Optional[datetime] = Field(
        description="When the task needs to be completed by (if applicable)",
        default=None
    )
    solutions: list[str] = Field(
        description="List of specific, actionable solutions (e.g., specific ideas, service providers, or concrete options relevant to completing the task)",
        min_items=1,
        default_factory=list
    )
    status: Literal["not started", "in progress", "done", "archived"] = Field(
        description="Current status of the task",
        default="not started"
    )

Python 知识点

  1. datetime:Python 的日期时间类型

    python
    from datetime import datetime
    now = datetime.now()
    print(now)  # 2024-11-04 13:00:00.123456
  2. min_items=1:Pydantic 验证规则,列表至少要有 1 个元素

  3. 枚举状态:使用 Literal 限制状态值

    python
    # status 只能是这四个值之一:
    # "not started", "in progress", "done", "archived"

5.3 创建 Trustcall 提取器

python
from trustcall import create_extractor
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o", temperature=0)

# 创建用于更新用户 Profile 的 Trustcall 提取器
profile_extractor = create_extractor(
    model,
    tools=[Profile],
    tool_choice="Profile",
)

注意:我们这里只创建了 Profile 的提取器,ToDo 的提取器会在节点函数中创建。

5.4 定义系统提示词

这是 Agent 的核心指令,告诉它如何行为:

python
MODEL_SYSTEM_MESSAGE = """You are a helpful chatbot.

You are designed to be a companion to a user, helping them keep track of their ToDo list.

You have a long term memory which keeps track of three things:
1. The user's profile (general information about them)
2. The user's ToDo list
3. General instructions for updating the ToDo list

Here is the current User Profile (may be empty if no information has been collected yet):
<user_profile>
{user_profile}
</user_profile>

Here is the current ToDo List (may be empty if no tasks have been added yet):
<todo>
{todo}
</todo>

Here are the current user-specified preferences for updating the ToDo list (may be empty if no preferences have been specified yet):
<instructions>
{instructions}
</instructions>

Here are your instructions for reasoning about the user's messages:

1. Reason carefully about the user's messages as presented below.

2. Decide whether any of the your long-term memory should be updated:
- If personal information was provided about the user, update the user's profile by calling UpdateMemory tool with type `user`
- If tasks are mentioned, update the ToDo list by calling UpdateMemory tool with type `todo`
- If the user has specified preferences for how to update the ToDo list, update the instructions by calling UpdateMemory tool with type `instructions`

3. Tell the user that you have updated your memory, if appropriate:
- Do not tell the user you have updated the user's profile
- Tell the user them when you update the todo list
- Do not tell the user that you have updated instructions

4. Err on the side of updating the todo list. No need to ask for explicit permission.

5. Respond naturally to user user after a tool call was made to save memories, or if no tool call was made."""

提示词设计要点

  1. 明确角色定义:"You are a helpful chatbot..."
  2. 提供上下文:显示当前的 Profile、ToDo、Instructions
  3. 明确指令:何时更新记忆、如何响应用户
  4. 使用 XML 标签:使内容结构清晰(<user_profile>, <todo>, <instructions>

5.5 定义 Trustcall 指令

python
TRUSTCALL_INSTRUCTION = """Reflect on following interaction.

Use the provided tools to retain any necessary memories about the user.

Use parallel tool calling to handle updates and insertions simultaneously.

System Time: {time}"""

这个指令会传递给 Trustcall,告诉它:

  • 反思对话内容
  • 使用工具保存记忆
  • 并行处理更新和插入操作
  • 提供当前时间(用于处理日期相关的任务)

5.6 定义更新 Instructions 的指令

python
CREATE_INSTRUCTIONS = """Reflect on the following interaction.

Based on this interaction, update your instructions for how to update ToDo list items.

Use any feedback from the user to update how they like to have items added, etc.

Your current instructions are:

<current_instructions>
{current_instructions}
</current_instructions>"""

5.7 定义节点函数

5.7.1 task_mAIstro 节点(主节点)

这是 Agent 的核心节点,负责与用户交互并决定是否更新记忆:

python
from langgraph.graph import MessagesState
from langchain_core.runnables import RunnableConfig
from langgraph.store.base import BaseStore

def task_mAIstro(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """从 store 加载记忆并用它们个性化聊天机器人的响应"""

    # 从 config 获取用户 ID
    user_id = config["configurable"]["user_id"]

    # 从 store 检索 Profile 记忆
    namespace = ("profile", user_id)
    memories = store.search(namespace)
    if memories:
        user_profile = memories[0].value
    else:
        user_profile = None

    # 从 store 检索任务记忆
    namespace = ("todo", user_id)
    memories = store.search(namespace)
    todo = "\n".join(f"{mem.value}" for mem in memories)

    # 检索自定义指令
    namespace = ("instructions", user_id)
    memories = store.search(namespace)
    if memories:
        instructions = memories[0].value
    else:
        instructions = ""

    # 格式化系统消息
    system_msg = MODEL_SYSTEM_MESSAGE.format(
        user_profile=user_profile,
        todo=todo,
        instructions=instructions
    )

    # 使用记忆和聊天历史生成响应
    response = model.bind_tools(
        [UpdateMemory],
        parallel_tool_calls=False
    ).invoke(
        [SystemMessage(content=system_msg)] + state["messages"]
    )

    return {"messages": [response]}

LangGraph 知识点

  1. MessagesState:LangGraph 的内置状态类型,包含消息列表

    python
    state = {
        "messages": [HumanMessage(...), AIMessage(...), ...]
    }
  2. RunnableConfig:配置对象,包含运行时信息

    python
    config = {
        "configurable": {
            "thread_id": "1",
            "user_id": "Lance"
        }
    }
  3. BaseStore:存储接口,用于访问长期记忆

    • store.search(namespace):搜索指定命名空间的记忆
    • store.get(namespace, key):获取特定记忆
    • store.put(namespace, key, value):保存记忆
  4. Namespace(命名空间):用元组表示的分层结构

    python
    ("profile", "Lance")  # 用户 Lance 的 profile
    ("todo", "Lance")     # 用户 Lance 的 todo
    ("instructions", "Lance")  # 用户 Lance 的 instructions

Python 知识点

  1. "\n".join():用换行符连接字符串

    python
    todos = ["Buy milk", "Call mom"]
    result = "\n".join(todos)
    # 结果:
    # Buy milk
    # Call mom
  2. 生成器表达式

    python
    "\n".join(f"{mem.value}" for mem in memories)
    # 等价于:
    result = []
    for mem in memories:
        result.append(f"{mem.value}")
    "\n".join(result)
  3. model.bind_tools():将工具绑定到模型

    • parallel_tool_calls=False:不允许并行调用工具(一次只调用一个)

5.7.2 update_profile 节点

更新用户个人资料:

python
import uuid
from langchain_core.messages import merge_message_runs

def update_profile(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """反思聊天历史并更新记忆集合"""

    # 获取用户 ID
    user_id = config["configurable"]["user_id"]

    # 定义记忆的命名空间
    namespace = ("profile", user_id)

    # 检索最近的记忆作为上下文
    existing_items = store.search(namespace)

    # 为 Trustcall 提取器格式化现有记忆
    tool_name = "Profile"
    existing_memories = (
        [(existing_item.key, tool_name, existing_item.value)
         for existing_item in existing_items]
        if existing_items
        else None
    )

    # 合并聊天历史和指令
    TRUSTCALL_INSTRUCTION_FORMATTED = TRUSTCALL_INSTRUCTION.format(
        time=datetime.now().isoformat()
    )
    updated_messages = list(merge_message_runs(
        messages=[SystemMessage(content=TRUSTCALL_INSTRUCTION_FORMATTED)] + state["messages"][:-1]
    ))

    # 调用提取器
    result = profile_extractor.invoke({
        "messages": updated_messages,
        "existing": existing_memories
    })

    # 将 Trustcall 的记忆保存到 store
    for r, rmeta in zip(result["responses"], result["response_metadata"]):
        store.put(
            namespace,
            rmeta.get("json_doc_id", str(uuid.uuid4())),
            r.model_dump(mode="json"),
        )

    # 响应工具调用
    tool_calls = state['messages'][-1].tool_calls
    return {
        "messages": [{
            "role": "tool",
            "content": "updated profile",
            "tool_call_id": tool_calls[0]['id']
        }]
    }

Python 知识点

  1. uuid.uuid4():生成随机的 UUID(通用唯一识别码)

    python
    import uuid
    id = str(uuid.uuid4())  # 例如:'550e8400-e29b-41d4-a716-446655440000'
  2. datetime.now().isoformat():获取 ISO 8601 格式的时间戳

    python
    from datetime import datetime
    time = datetime.now().isoformat()
    # 例如:'2024-11-04T13:30:00.123456'
  3. zip() 函数:并行迭代多个序列

    python
    names = ['Alice', 'Bob']
    ages = [25, 30]
    for name, age in zip(names, ages):
        print(f"{name} is {age} years old")
    # 输出:
    # Alice is 25 years old
    # Bob is 30 years old
  4. 字典的 get() 方法:安全地获取值,提供默认值

    python
    d = {'a': 1}
    d.get('a', 0)  # 返回 1
    d.get('b', 0)  # 返回 0(键不存在)

LangGraph 知识点

  1. merge_message_runs():合并连续的相同角色消息

    python
    # 合并前:
    [HumanMessage("Hi"), HumanMessage("How are you?")]
    # 合并后:
    [HumanMessage("Hi\nHow are you?")]
  2. Tool Message:响应工具调用的消息

    python
    {
        "role": "tool",          # 角色是 tool
        "content": "结果内容",    # 工具执行的结果
        "tool_call_id": "xxx"   # 对应的工具调用 ID
    }
  3. state["messages"][:-1]:获取除最后一条消息外的所有消息

    python
    messages = ['a', 'b', 'c', 'd']
    messages[:-1]  # ['a', 'b', 'c']

5.7.3 update_todos 节点

更新待办事项列表(与 update_profile 类似,但使用 Spy 监控):

python
def update_todos(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """反思聊天历史并更新记忆集合"""

    # 获取用户 ID
    user_id = config["configurable"]["user_id"]

    # 定义记忆的命名空间
    namespace = ("todo", user_id)

    # 检索最近的记忆作为上下文
    existing_items = store.search(namespace)

    # 为 Trustcall 提取器格式化现有记忆
    tool_name = "ToDo"
    existing_memories = (
        [(existing_item.key, tool_name, existing_item.value)
         for existing_item in existing_items]
        if existing_items
        else None
    )

    # 合并聊天历史和指令
    TRUSTCALL_INSTRUCTION_FORMATTED = TRUSTCALL_INSTRUCTION.format(
        time=datetime.now().isoformat()
    )
    updated_messages = list(merge_message_runs(
        messages=[SystemMessage(content=TRUSTCALL_INSTRUCTION_FORMATTED)] + state["messages"][:-1]
    ))

    # 初始化 Spy 以监控 Trustcall 的工具调用
    spy = Spy()

    # 创建用于更新 ToDo 列表的 Trustcall 提取器
    todo_extractor = create_extractor(
        model,
        tools=[ToDo],
        tool_choice=tool_name,
        enable_inserts=True
    ).with_listeners(on_end=spy)

    # 调用提取器
    result = todo_extractor.invoke({
        "messages": updated_messages,
        "existing": existing_memories
    })

    # 将 Trustcall 的记忆保存到 store
    for r, rmeta in zip(result["responses"], result["response_metadata"]):
        store.put(
            namespace,
            rmeta.get("json_doc_id", str(uuid.uuid4())),
            r.model_dump(mode="json"),
        )

    # 响应工具调用,确认更新
    tool_calls = state['messages'][-1].tool_calls

    # 提取 Trustcall 的变更并添加到返回给 task_mAIstro 的 ToolMessage 中
    todo_update_msg = extract_tool_info(spy.called_tools, tool_name)
    return {
        "messages": [{
            "role": "tool",
            "content": todo_update_msg,
            "tool_call_id": tool_calls[0]['id']
        }]
    }

关键区别

update_profile 的主要区别:

  1. 创建了 ToDo 专用的提取器(启用了 enable_inserts
  2. 添加了 Spy 监听器
  3. 返回的 ToolMessage 包含详细的变更信息(而不仅仅是 "updated todo")

5.7.4 update_instructions 节点

更新用户偏好设置:

python
def update_instructions(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """反思聊天历史并更新记忆集合"""

    # 获取用户 ID
    user_id = config["configurable"]["user_id"]

    namespace = ("instructions", user_id)

    # 获取现有记忆
    existing_memory = store.get(namespace, "user_instructions")

    # 在系统提示中格式化记忆
    system_msg = CREATE_INSTRUCTIONS.format(
        current_instructions=existing_memory.value if existing_memory else None
    )

    # 让模型生成新的指令
    new_memory = model.invoke([
        SystemMessage(content=system_msg)
    ] + state['messages'][:-1] + [
        HumanMessage(content="Please update the instructions based on the conversation")
    ])

    # 覆盖 store 中的现有记忆
    key = "user_instructions"
    store.put(namespace, key, {"memory": new_memory.content})

    # 响应工具调用
    tool_calls = state['messages'][-1].tool_calls
    return {
        "messages": [{
            "role": "tool",
            "content": "updated instructions",
            "tool_call_id": tool_calls[0]['id']
        }]
    }

与其他节点的区别

  1. 不使用 Trustcall(因为指令是自由文本,不是结构化数据)
  2. 使用固定的 key:"user_instructions"
  3. 直接用模型生成新的指令文本

5.8 定义路由函数

路由函数决定流程的走向:

python
from typing import Literal
from langgraph.graph import END

def route_message(
    state: MessagesState,
    config: RunnableConfig,
    store: BaseStore
) -> Literal[END, "update_todos", "update_instructions", "update_profile"]:
    """根据记忆和聊天历史决定是否更新记忆集合"""

    message = state['messages'][-1]

    # 如果没有工具调用,结束
    if len(message.tool_calls) == 0:
        return END
    else:
        # 根据工具调用的类型决定路由
        tool_call = message.tool_calls[0]
        if tool_call['args']['update_type'] == "user":
            return "update_profile"
        elif tool_call['args']['update_type'] == "todo":
            return "update_todos"
        elif tool_call['args']['update_type'] == "instructions":
            return "update_instructions"
        else:
            raise ValueError

LangGraph 知识点

  1. END:特殊标记,表示图的执行结束

    python
    from langgraph.graph import END
  2. 条件边的返回值

    • 返回节点名称(字符串):跳转到该节点
    • 返回 END:结束执行
  3. 类型注解Literal[END, "node1", "node2"]

    • 限制返回值只能是这几个选项之一

逻辑说明

  1. 检查最后一条消息(AI 的响应)
  2. 如果没有工具调用 → 结束(直接返回给用户)
  3. 如果有工具调用 → 根据 update_type 路由到相应节点

5.9 构建状态图

现在把所有部分组装起来:

python
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.memory import InMemoryStore

# 创建图构建器
builder = StateGraph(MessagesState)

# 添加节点
builder.add_node(task_mAIstro)
builder.add_node(update_todos)
builder.add_node(update_profile)
builder.add_node(update_instructions)

# 定义边
builder.add_edge(START, "task_mAIstro")
builder.add_conditional_edges("task_mAIstro", route_message)
builder.add_edge("update_todos", "task_mAIstro")
builder.add_edge("update_profile", "task_mAIstro")
builder.add_edge("update_instructions", "task_mAIstro")

# 长期记忆(跨线程)的 Store
across_thread_memory = InMemoryStore()

# 短期记忆(线程内)的 Checkpointer
within_thread_memory = MemorySaver()

# 编译图
graph = builder.compile(
    checkpointer=within_thread_memory,
    store=across_thread_memory
)

# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))

图结构说明

START → task_mAIstro → [条件路由]
                         ├→ END
                         ├→ update_profile → task_mAIstro
                         ├→ update_todos → task_mAIstro
                         └→ update_instructions → task_mAIstro

LangGraph 知识点

  1. StateGraph(MessagesState):创建状态图

    • MessagesState:定义状态的类型
  2. add_node(function):添加节点

    • 节点名称默认为函数名
    • 也可以指定:add_node("custom_name", function)
  3. add_edge(from_node, to_node):添加确定性边

    • from_node 总是前往 to_node
  4. add_conditional_edges(from_node, routing_function):添加条件边

    • routing_function 的返回值决定下一个节点
  5. compile():编译图,生成可执行的运行时

    • checkpointer:用于保存检查点(短期记忆)
    • store:用于持久化存储(长期记忆)
  6. START:特殊标记,图的入口点

  7. 可视化

    • get_graph(xray=1):获取图的详细结构
    • draw_mermaid_png():生成 Mermaid 格式的流程图

六、实战演示

6.1 配置和初始化

python
# 提供线程 ID(短期记忆)
# 提供用户 ID(长期记忆)
config = {
    "configurable": {
        "thread_id": "1",      # 会话 ID
        "user_id": "Lance"     # 用户 ID
    }
}

配置说明

  • thread_id:标识一个对话会话,相同 thread_id 的消息共享短期记忆
  • user_id:标识用户,相同 user_id 的会话共享长期记忆

6.2 创建用户 Profile

python
# 用户输入以创建 Profile 记忆
input_messages = [HumanMessage(
    content="My name is Lance. I live in SF with my wife. I have a 1 year old daughter."
)]

# 运行图
for chunk in graph.stream(
    {"messages": input_messages},
    config,
    stream_mode="values"
):
    chunk["messages"][-1].pretty_print()

输出

================================ Human Message =================================
My name is Lance. I live in SF with my wife. I have a 1 year old daughter.

================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_rOuw3bLYjFFKuSVWsIHF27k5)
 Call ID: call_rOuw3bLYjFFKuSVWsIHF27k5
  Args:
    update_type: user

================================= Tool Message =================================
updated profile

================================== Ai Message ==================================
Got it! How can I assist you today, Lance?

执行流程分析

  1. Human Message:用户输入
  2. AI Message(工具调用)task_mAIstro 决定更新用户资料
    • 调用 UpdateMemory 工具
    • update_type: "user"
  3. Tool Messageupdate_profile 节点执行并返回结果
  4. AI Message(响应)task_mAIstro 生成自然语言响应

LangGraph 知识点

  1. graph.stream():流式执行图

    python
    graph.stream(
        input_data,           # 输入数据
        config,               # 配置
        stream_mode="values"  # 流模式
    )
  2. stream_mode="values"

    • 每次状态更新时产出完整的状态
    • 其他选项:"updates"(只产出更新部分)
  3. 流式输出

    • 每个 chunk 是一次状态更新
    • chunk["messages"][-1] 是最新添加的消息

6.3 添加 ToDo 项

python
# 用户输入以创建 ToDo
input_messages = [HumanMessage(
    content="My wife asked me to book swim lessons for the baby."
)]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

输出

================================ Human Message =================================
My wife asked me to book swim lessons for the baby.

================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_VjLbRpbLqniJ8we2CNKQ0m3P)
 Call ID: call_VjLbRpbLqniJ8we2CNKQ0m3P
  Args:
    update_type: todo

================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Book swim lessons for 1-year-old daughter.',
          'time_to_complete': 30,
          'solutions': ['Check local swim schools in SF',
                       'Look for baby swim classes online',
                       'Ask friends for recommendations'],
          'status': 'not started'}

================================== Ai Message ==================================
I've added "Book swim lessons for your 1-year-old daughter" to your ToDo list.
If you need any help with that, just let me know!

分析

  1. Agent 识别出这是一个任务,调用 UpdateMemory 工具(type: "todo")
  2. update_todos 节点被调用
  3. Trustcall 创建了一个新的 ToDo 项,包含:
    • 任务描述
    • 预计完成时间(30分钟)
    • 具体解决方案
    • 状态(not started)
  4. Agent 告知用户已添加任务

智能之处

  • Agent 自动推断任务的完成时间
  • 自动生成具体可行的解决方案
  • 自动设置状态

6.4 更新 Instructions(程序性记忆)

python
# 用户输入以更新创建 ToDo 的指令
input_messages = [HumanMessage(
    content="When creating or updating ToDo items, include specific local businesses / vendors."
)]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

输出

================================ Human Message =================================
When creating or updating ToDo items, include specific local businesses / vendors.

================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_22w3V3Krhjf8WxDeH9YrQILa)
 Call ID: call_22w3V3Krhjf8WxDeH9YrQILa
  Args:
    update_type: instructions

================================= Tool Message =================================
updated instructions

================================== Ai Message ==================================
Got it! I'll make sure to include specific local businesses or vendors in San Francisco
when creating or updating your ToDo items. Let me know if there's anything else you need!

查看保存的 Instructions:

python
user_id = "Lance"

# 搜索 Instructions
for memory in across_thread_memory.search(("instructions", user_id)):
    print(memory.value)

输出

python
{'memory': '<current_instructions>\nWhen creating or updating ToDo list items for Lance, include specific local businesses or vendors in San Francisco. For example, when adding a task like booking swim lessons, suggest local swim schools or classes in the area.\n</current_instructions>'}

程序性记忆的作用

这条指令会影响 Agent 未来创建 ToDo 项的方式。让我们验证一下。

6.5 验证 Instructions 的效果

python
# 用户输入新的 ToDo
input_messages = [HumanMessage(
    content="I need to fix the jammed electric Yale lock on the door."
)]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

输出

================================ Human Message =================================
I need to fix the jammed electric Yale lock on the door.

================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_7ooNemi3d6qWMfjf2g2h97EF)
 Call ID: call_7ooNemi3d6qWMfjf2g2h97EF
  Args:
    update_type: todo

================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Fix the jammed electric Yale lock on the door.',
          'time_to_complete': 60,
          'solutions': ['Contact a local locksmith in SF',
                       "Check Yale's customer support for troubleshooting",
                       'Look for repair guides online'],
          'status': 'not started'}

Document ed0af900-52fa-4f15-907c-1aed1e17b0ce updated:
Plan: Add specific local businesses or vendors to the solutions for booking swim lessons.
Added content: ['Check local swim schools in SF',
                'Look for baby swim classes online',
                'Ask friends for recommendations',
                'Contact La Petite Baleen Swim School',
                'Check with SF Recreation and Parks for classes']

================================== Ai Message ==================================
I've added "Fix the jammed electric Yale lock on the door" to your ToDo list.
If you need any specific recommendations or help, feel free to ask!

重要发现

  1. 创建了新 ToDo:修理门锁

    • 解决方案包含 "Contact a local locksmith in SF"(具体到本地)
  2. 自动更新了旧 ToDo:预订游泳课

    • 添加了具体的本地商家:
      • "Contact La Petite Baleen Swim School"
      • "Check with SF Recreation and Parks for classes"

这展示了程序性记忆的强大之处:Agent 学会了用户的偏好,并应用到所有相关任务上。

6.6 查看所有 ToDo 项

python
user_id = "Lance"

# 搜索所有 ToDo
for memory in across_thread_memory.search(("todo", user_id)):
    print(memory.value)

输出

python
{'task': 'Book swim lessons for 1-year-old daughter.',
 'time_to_complete': 30,
 'deadline': None,
 'solutions': ['Check local swim schools in SF',
               'Look for baby swim classes online',
               'Ask friends for recommendations',
               'Contact La Petite Baleen Swim School',
               'Check with SF Recreation and Parks for classes'],
 'status': 'not started'}

{'task': 'Fix the jammed electric Yale lock on the door.',
 'time_to_complete': 60,
 'deadline': None,
 'solutions': ['Contact a local locksmith in SF',
               "Check Yale's customer support for troubleshooting",
               'Look for repair guides online'],
 'status': 'not started'}

6.7 更新现有 ToDo(添加截止日期)

python
# 用户输入以更新现有 ToDo
input_messages = [HumanMessage(
    content="For the swim lessons, I need to get that done by end of November."
)]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

输出

================================ Human Message =================================
For the swim lessons, I need to get that done by end of November.

================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_6AbsrTps4EPyD0gKBzkMIC90)
 Call ID: call_6AbsrTps4EPyD0gKBzkMIC90
  Args:
    update_type: todo

================================= Tool Message =================================
Document ed0af900-52fa-4f15-907c-1aed1e17b0ce updated:
Plan: Add a deadline for the swim lessons task to ensure it is completed by the end of November.
Added content: 2024-11-30T23:59:59

================================== Ai Message ==================================
I've updated the swim lessons task with a deadline to be completed by the end of November.
If there's anything else you need, just let me know!

分析

  • Trustcall 识别出这是对现有 ToDo 的更新
  • 使用 PatchDoc 工具更新文档
  • 添加了截止日期:2024-11-30T23:59:59

6.8 添加更多 ToDo(观察自动更新)

python
# 用户输入新的 ToDo
input_messages = [HumanMessage(
    content="Need to call back City Toyota to schedule car service."
)]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

输出

================================ Human Message =================================
Need to call back City Toyota to schedule car service.

================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_tDuYZL7njpwOkg2YMEcf6DDJ)
 Call ID: call_tDuYZL7njpwOkg2YMEcf6DDJ
  Args:
    update_type: todo

================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Call back City Toyota to schedule car service.',
          'time_to_complete': 10,
          'solutions': ["Find City Toyota's contact number",
                       'Check car service availability',
                       'Prepare car details for service scheduling'],
          'status': 'not started'}

Document a77482f0-d654-4b41-ab74-d6f2b343a969 updated:
Plan: Add specific local businesses or vendors to the solutions for fixing the jammed electric Yale lock.
Added content: Contact City Locksmith SF

================================== Ai Message ==================================
I've added "Call back City Toyota to schedule car service" to your ToDo list.
If you need any assistance with that, just let me know!

再次验证程序性记忆

  • 创建了新任务:预约汽车保养
  • 自动更新了锁匠任务,添加了本地商家:"Contact City Locksmith SF"

查看所有 ToDo:

python
for memory in across_thread_memory.search(("todo", "Lance")):
    print(memory.value)

输出

python
{'task': 'Book swim lessons for 1-year-old daughter.',
 'time_to_complete': 30,
 'deadline': '2024-11-30T23:59:59',
 'solutions': ['Check local swim schools in SF', ...],
 'status': 'not started'}

{'task': 'Fix the jammed electric Yale lock on the door.',
 'time_to_complete': 60,
 'deadline': None,
 'solutions': ['Contact a local locksmith in SF',
               "Check Yale's customer support for troubleshooting",
               'Look for repair guides online',
               'Contact City Locksmith SF',          # 新添加
               'Visit SF Lock and Key for assistance'], # 新添加
 'status': 'not started'}

{'task': 'Call back City Toyota to schedule car service.',
 'time_to_complete': 10,
 'deadline': None,
 'solutions': [...],
 'status': 'not started'}

6.9 跨会话访问记忆

现在让我们创建一个新的会话(不同的 thread_id):

python
# 新的 thread_id,但相同的 user_id
config = {
    "configurable": {
        "thread_id": "2",      # 新会话
        "user_id": "Lance"     # 相同用户
    }
}

# 用户输入
input_messages = [HumanMessage(
    content="I have 30 minutes, what tasks can I get done?"
)]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

输出

================================ Human Message =================================
I have 30 minutes, what tasks can I get done?

================================== Ai Message ==================================
You can work on the following tasks that fit within your 30-minute timeframe:

1. **Book swim lessons for your 1-year-old daughter.**
   - Estimated time to complete: 30 minutes
   - Solutions include checking local swim schools in SF, looking for baby swim classes online,
     asking friends for recommendations, contacting La Petite Baleen Swim School,
     or checking with SF Recreation and Parks for classes.

2. **Call back City Toyota to schedule car service.**
   - Estimated time to complete: 10 minutes
   - Solutions include finding City Toyota's contact number, checking car service availability,
     and preparing car details for service scheduling.

You can choose either of these tasks to complete within your available time.

长期记忆的威力

  • 这是一个全新的会话(thread_id: "2")
  • Agent 仍然记得所有 ToDo 项
  • Agent 根据时间限制(30分钟)智能推荐任务
  • Agent 提供了详细的解决方案

继续对话:

python
# 用户输入
input_messages = [HumanMessage(
    content="Yes, give me some options to call for swim lessons."
)]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

输出

================================ Human Message =================================
Yes, give me some options to call for swim lessons.

================================== Ai Message ==================================
Here are some options you can consider for booking swim lessons for your 1-year-old daughter in San Francisco:

1. **La Petite Baleen Swim School**: Known for their baby swim classes,
   you can contact them to inquire about their schedule and availability.

2. **SF Recreation and Parks**: They often offer swim classes for young children.
   Check their website or contact them for more information.

3. **Local Swim Schools**: Search for other local swim schools in SF that offer baby swim classes.
   You might find some good options nearby.

4. **Ask Friends for Recommendations**: Reach out to friends or family in the area
   who might have experience with swim lessons for young children.

These options should help you get started on booking swim lessons.

跨会话连贯性

  • Agent 理解上下文("Yes" 指的是游泳课)
  • 提供了之前存储的具体商家信息
  • 对话自然流畅,就像连续对话一样

七、Python 知识点总结

7.1 类型注解(Type Hints)

python
from typing import Optional, Literal, TypedDict

# 基础类型
name: str = "Alice"
age: int = 30
height: float = 1.75

# Optional - 可选类型
middle_name: Optional[str] = None  # 可以是 str 或 None

# List - 列表类型
names: list[str] = ["Alice", "Bob"]

# Literal - 字面量类型
status: Literal["active", "inactive"] = "active"

# TypedDict - 类型化字典
class Config(TypedDict):
    host: str
    port: int

config: Config = {"host": "localhost", "port": 8080}

作用

  • 提供代码补全和类型检查
  • 使代码更易读和维护
  • 帮助发现潜在错误

7.2 Pydantic 模型

python
from pydantic import BaseModel, Field

class User(BaseModel):
    name: str = Field(description="用户名")
    age: int = Field(ge=0, le=150, description="年龄")
    email: Optional[str] = None
    tags: list[str] = Field(default_factory=list)

# 创建实例
user = User(name="Alice", age=30)

# 验证(自动进行)
try:
    invalid_user = User(name="Bob", age=-5)  # 会抛出验证错误
except ValueError as e:
    print(e)

# 转换为字典
user_dict = user.model_dump()

# 转换为 JSON
user_json = user.model_dump_json()

优势

  • 自动数据验证
  • 清晰的数据结构
  • 易于序列化/反序列化

7.3 列表推导式和生成器表达式

python
# 列表推导式
squares = [x**2 for x in range(10)]
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

# 带条件的列表推导式
even_squares = [x**2 for x in range(10) if x % 2 == 0]
# [0, 4, 16, 36, 64]

# 嵌套列表推导式
matrix = [[i*j for j in range(3)] for i in range(3)]
# [[0, 0, 0], [0, 1, 2], [0, 2, 4]]

# 生成器表达式(内存高效)
gen = (x**2 for x in range(1000000))  # 不立即计算

7.4 特殊方法(Magic Methods)

python
class Counter:
    def __init__(self, start=0):
        """构造函数 - 初始化对象"""
        self.count = start

    def __call__(self):
        """使对象可调用"""
        self.count += 1
        return self.count

    def __str__(self):
        """字符串表示 - print()"""
        return f"Counter: {self.count}"

    def __repr__(self):
        """开发者友好的表示"""
        return f"Counter(start={self.count})"

# 使用
counter = Counter(10)
print(counter())  # 11 - 调用 __call__
print(counter)    # Counter: 11 - 调用 __str__

7.5 常用内置函数

python
# enumerate - 同时获取索引和值
for i, item in enumerate(['a', 'b', 'c']):
    print(f"{i}: {item}")

# zip - 并行迭代
names = ['Alice', 'Bob']
ages = [25, 30]
for name, age in zip(names, ages):
    print(f"{name} is {age}")

# any/all - 逻辑判断
all([True, True, True])   # True
any([False, True, False]) # True

# map - 映射函数
squares = list(map(lambda x: x**2, [1, 2, 3]))  # [1, 4, 9]

# filter - 过滤
evens = list(filter(lambda x: x % 2 == 0, [1, 2, 3, 4]))  # [2, 4]

7.6 字符串操作

python
# f-string - 格式化字符串
name = "Alice"
age = 30
print(f"{name} is {age} years old")

# join - 连接字符串
words = ["Hello", "World"]
sentence = " ".join(words)  # "Hello World"

# split - 分割字符串
sentence = "Hello World"
words = sentence.split()  # ["Hello", "World"]

# 多行字符串
text = """
Line 1
Line 2
Line 3
"""

八、LangGraph 知识点总结

8.1 核心概念

8.1.1 状态(State)

python
from langgraph.graph import MessagesState

# MessagesState 是内置的状态类型
# 包含 messages 字段(消息列表)
state = {
    "messages": [
        HumanMessage(content="Hi"),
        AIMessage(content="Hello!")
    ]
}

# 自定义状态
from typing import TypedDict, Annotated
from langgraph.graph import add_messages

class CustomState(TypedDict):
    messages: Annotated[list, add_messages]  # 使用 reducer
    user_info: dict
    counter: int

Reducer(归约器)

  • add_messages:智能合并消息列表
  • 自定义 reducer:控制状态更新逻辑

8.1.2 节点(Nodes)

python
def my_node(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """
    节点函数签名:
    - state: 当前状态
    - config: 运行配置(可选)
    - store: 持久化存储(可选)

    返回值:状态更新(字典)
    """
    # 处理逻辑
    result = some_operation(state["messages"])

    # 返回状态更新
    return {"messages": [AIMessage(content=result)]}

8.1.3 边(Edges)

python
from langgraph.graph import StateGraph, START, END

builder = StateGraph(MessagesState)

# 1. 确定性边
builder.add_edge(START, "node1")      # 开始 → node1
builder.add_edge("node1", "node2")    # node1 → node2
builder.add_edge("node2", END)        # node2 → 结束

# 2. 条件边
def router(state):
    if some_condition(state):
        return "path_a"
    else:
        return "path_b"

builder.add_conditional_edges(
    "decision_node",  # 从哪个节点
    router,           # 路由函数
    {                 # 路由映射(可选)
        "path_a": "node_a",
        "path_b": "node_b"
    }
)

8.2 记忆系统

8.2.1 Checkpointer(短期记忆)

python
from langgraph.checkpoint.memory import MemorySaver

# 在内存中保存检查点
checkpointer = MemorySaver()

# 编译时传入
graph = builder.compile(checkpointer=checkpointer)

# 使用 thread_id 区分不同会话
config = {"configurable": {"thread_id": "session_1"}}
graph.invoke(input, config)

作用

  • 保存对话历史
  • 支持中断和恢复
  • 实现多轮对话

8.2.2 Store(长期记忆)

python
from langgraph.store.memory import InMemoryStore

store = InMemoryStore()

# 保存数据
store.put(
    namespace=("profile", "user_123"),  # 命名空间(分层)
    key="basic_info",                   # 键
    value={"name": "Alice", "age": 30}  # 值
)

# 检索数据
item = store.get(
    namespace=("profile", "user_123"),
    key="basic_info"
)

# 搜索数据
items = store.search(
    namespace=("profile", "user_123")
)

Namespace(命名空间)

  • 使用元组表示层级结构
  • 例如:("profile", "user_id"), ("todo", "user_id")
  • 便于组织和查询数据

8.3 流式执行

python
# stream - 流式执行
for chunk in graph.stream(input_data, config, stream_mode="values"):
    # chunk 是每次状态更新
    print(chunk)

# stream_mode 选项:
# - "values": 完整状态
# - "updates": 仅更新部分
# - "messages": 仅消息

# invoke - 一次性执行
result = graph.invoke(input_data, config)

8.4 工具调用(Tool Calling)

python
from langchain_core.tools import tool

@tool
def calculator(expression: str) -> float:
    """计算数学表达式"""
    return eval(expression)

# 绑定工具到模型
model_with_tools = model.bind_tools([calculator])

# 调用
response = model_with_tools.invoke("What is 2+2?")

# 检查工具调用
if response.tool_calls:
    for tool_call in response.tool_calls:
        print(tool_call['name'])  # 工具名
        print(tool_call['args'])  # 参数
        print(tool_call['id'])    # 调用 ID

8.5 完整示例

python
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver

# 1. 定义节点
def chatbot(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

# 2. 构建图
builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)

# 3. 编译
graph = builder.compile(checkpointer=MemorySaver())

# 4. 运行
config = {"configurable": {"thread_id": "1"}}
result = graph.invoke(
    {"messages": [HumanMessage(content="Hello!")]},
    config
)

九、架构设计思考

9.1 为什么需要条件边?

条件边允许 Agent 根据情况做出不同决策:

python
用户消息 → task_mAIstro → [决策]
                           ├→ 不需要更新 → 直接响应
                           ├→ 更新 Profile → 保存个人信息
                           ├→ 更新 ToDo → 保存任务
                           └→ 更新 Instructions → 保存偏好

如果没有条件边,Agent 要么:

  • 每次都保存记忆(浪费资源)
  • 从不保存记忆(无法学习)

9.2 为什么更新后要返回 task_mAIstro?

python
builder.add_edge("update_todos", "task_mAIstro")

这形成了一个循环:

task_mAIstro → update_todos → task_mAIstro → [结束或继续]

原因

  1. 生成自然语言响应:更新记忆后需要告诉用户
  2. 可能需要多次更新:一次对话可能触发多个更新
  3. 保持对话流畅:让 Agent 有机会继续交互

9.3 为什么 ToDo 使用 Spy?

python
spy = Spy()
todo_extractor = create_extractor(...).with_listeners(on_end=spy)

原因

  • ToDo 项经常被更新(添加、修改、完成)
  • 用户需要知道具体发生了什么变化
  • Spy 捕获 Trustcall 的操作(创建、更新、删除)
  • 提供详细反馈给用户

对比

  • Profile 更新不告诉用户(隐私)
  • Instructions 更新不需要详情(内部逻辑)
  • ToDo 更新需要确认(用户主要关注点)

9.4 长期记忆 vs 短期记忆

特性短期记忆(Checkpointer)长期记忆(Store)
作用域单个会话(thread)跨会话(user)
内容对话历史结构化知识
生命周期会话结束可能清除持久保存
查询方式自动加载需要主动查询
使用场景上下文理解个性化、知识积累

设计原则

  • 短期记忆:快速访问、自动管理
  • 长期记忆:精心组织、主动检索

十、实践建议

10.1 何时使用 Memory Agent

适用场景:

  1. 需要个性化:记住用户偏好和信息
  2. 长期交互:跨会话保持连贯性
  3. 任务管理:跟踪待办事项、项目等
  4. 学习用户习惯:改进响应质量

不适用场景:

  1. 一次性查询:简单问答
  2. 无状态服务:每次请求独立
  3. 高隐私要求:不能保存用户数据

10.2 数据模式设计原则

  1. 明确的字段描述

    python
    name: str = Field(description="用户的全名,包括姓和名")
  2. 合理的默认值

    python
    status: Literal["not started", "in progress", "done"] = "not started"
  3. 适当的验证规则

    python
    age: int = Field(ge=0, le=150)  # 年龄在 0-150 之间
  4. 使用 default_factory 避免可变默认值

    python
    # 正确
    tags: list[str] = Field(default_factory=list)
    
    # 错误(所有实例共享同一个列表)
    tags: list[str] = Field(default=[])

10.3 提示词工程技巧

  1. 清晰的角色定义

    You are a helpful chatbot designed to...
  2. 提供充分上下文

    xml
    <user_profile>
    {profile}
    </user_profile>
  3. 明确的指令列表

    1. Do this
    2. Then do that
    3. Finally do this
  4. 示例和格式

    For example: "User lives in San Francisco" → location: "San Francisco"

10.4 调试技巧

  1. 使用 LangSmith 追踪

    python
    os.environ["LANGSMITH_TRACING"] = "true"
  2. 打印中间状态

    python
    for chunk in graph.stream(...):
        print(chunk)  # 查看每步的状态
  3. 使用 Spy 监控工具调用

    python
    spy = Spy()
    extractor.with_listeners(on_end=spy)
    print(spy.called_tools)  # 查看实际调用
  4. 可视化图结构

    python
    from IPython.display import Image
    display(Image(graph.get_graph().draw_mermaid_png()))

十一、扩展思考

11.1 可能的改进

  1. 更丰富的 ToDo 功能

    • 任务优先级
    • 子任务
    • 标签和分类
    • 重复任务
  2. 智能提醒

    • 基于截止日期的提醒
    • 基于上下文的建议("你现在有空,要不要...?")
  3. 协作功能

    • 分享任务给其他用户
    • 团队待办事项
  4. 数据持久化

    • 使用真实数据库(PostgreSQL、MongoDB)
    • 而不是 InMemoryStore
  5. 更复杂的记忆管理

    • 记忆的重要性评分
    • 自动归档旧记忆
    • 记忆的关联和索引

11.2 安全和隐私考虑

  1. 数据加密

    • 敏感信息加密存储
    • 传输层加密
  2. 访问控制

    • 用户身份验证
    • 细粒度权限管理
  3. 数据保留政策

    • 自动删除过期数据
    • 用户可以删除自己的数据
  4. 审计日志

    • 记录所有数据访问
    • 监控异常行为

11.3 性能优化

  1. 缓存

    • 缓存频繁访问的记忆
    • 减少数据库查询
  2. 批处理

    • 批量更新多个记忆
    • 减少网络往返
  3. 异步处理

    • 非阻塞的记忆更新
    • 后台任务处理
  4. 索引优化

    • 为常用查询创建索引
    • 使用全文搜索

十二、总结

12.1 核心要点

  1. Memory Agent 是具备长期记忆能力的智能体,可以跨会话保持状态

  2. 三种记忆类型

    • 语义记忆:事实和知识(Profile、ToDo)
    • 程序性记忆:行为规则(Instructions)
    • 短期 vs 长期记忆
  3. Trustcall 提供结构化数据提取和更新能力:

    • 自动从对话中提取数据
    • 使用 JSON Patch 更新现有记录
    • 支持并行操作
  4. LangGraph 提供灵活的状态管理和流程控制:

    • 状态图架构
    • 条件边实现智能路由
    • Checkpointer 和 Store 分别管理短期和长期记忆
  5. ReAct 架构:Agent 可以:

    • 推理(Reasoning):分析用户输入
    • 行动(Acting):调用工具更新记忆
    • 观察(Observing):查看工具结果并响应

12.2 学到的技能

Python 技能

  • Pydantic 模型定义和验证
  • 类型注解和类型系统
  • 列表推导式和生成器
  • 特殊方法(__init__, __call__
  • 装饰器和监听器模式

LangChain/LangGraph 技能

  • 构建状态图
  • 定义节点和边
  • 条件路由
  • 工具调用
  • 记忆管理(Checkpointer 和 Store)
  • 流式执行

AI 工程技能

  • 提示词工程
  • Agent 架构设计
  • 数据模式设计
  • 调试和追踪

12.3 下一步

继续学习:

  1. Module-5 的其他部分

    • Memory Store 详解
    • Memory Schema(Profile 和 Collection)
  2. 高级 Agent 模式

    • 多 Agent 协作
    • 工具的动态选择
    • 更复杂的决策逻辑
  3. 生产部署

    • 使用真实数据库
    • API 设计
    • 监控和日志
    • 性能优化

附录:完整代码清单

A.1 数据模型定义

python
from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import datetime

class Profile(BaseModel):
    """用户个人资料"""
    name: Optional[str] = Field(description="用户姓名", default=None)
    location: Optional[str] = Field(description="用户位置", default=None)
    job: Optional[str] = Field(description="用户职业", default=None)
    connections: list[str] = Field(
        description="个人关系(家人、朋友、同事)",
        default_factory=list
    )
    interests: list[str] = Field(
        description="用户兴趣",
        default_factory=list
    )

class ToDo(BaseModel):
    """待办事项"""
    task: str = Field(description="任务描述")
    time_to_complete: Optional[int] = Field(
        description="预计完成时间(分钟)"
    )
    deadline: Optional[datetime] = Field(
        description="截止日期",
        default=None
    )
    solutions: list[str] = Field(
        description="具体可行的解决方案列表",
        min_items=1,
        default_factory=list
    )
    status: Literal["not started", "in progress", "done", "archived"] = Field(
        description="任务状态",
        default="not started"
    )

class UpdateMemory(TypedDict):
    """更新记忆工具"""
    update_type: Literal['user', 'todo', 'instructions']

A.2 辅助工具

python
class Spy:
    """监控工具调用"""
    def __init__(self):
        self.called_tools = []

    def __call__(self, run):
        q = [run]
        while q:
            r = q.pop()
            if r.child_runs:
                q.extend(r.child_runs)
            if r.run_type == "chat_model":
                self.called_tools.append(
                    r.outputs["generations"][0][0]["message"]["kwargs"]["tool_calls"]
                )

def extract_tool_info(tool_calls, schema_name="Memory"):
    """提取工具调用信息"""
    changes = []

    for call_group in tool_calls:
        for call in call_group:
            if call['name'] == 'PatchDoc':
                changes.append({
                    'type': 'update',
                    'doc_id': call['args']['json_doc_id'],
                    'planned_edits': call['args']['planned_edits'],
                    'value': call['args']['patches'][0]['value']
                })
            elif call['name'] == schema_name:
                changes.append({
                    'type': 'new',
                    'value': call['args']
                })

    result_parts = []
    for change in changes:
        if change['type'] == 'update':
            result_parts.append(
                f"Document {change['doc_id']} updated:\n"
                f"Plan: {change['planned_edits']}\n"
                f"Added content: {change['value']}"
            )
        else:
            result_parts.append(
                f"New {schema_name} created:\n"
                f"Content: {change['value']}"
            )

    return "\n\n".join(result_parts)

A.3 系统提示词

python
MODEL_SYSTEM_MESSAGE = """You are a helpful chatbot.

You are designed to be a companion to a user, helping them keep track of their ToDo list.

You have a long term memory which keeps track of three things:
1. The user's profile (general information about them)
2. The user's ToDo list
3. General instructions for updating the ToDo list

Here is the current User Profile:
<user_profile>
{user_profile}
</user_profile>

Here is the current ToDo List:
<todo>
{todo}
</todo>

Here are the current user-specified preferences:
<instructions>
{instructions}
</instructions>

Instructions:
1. Reason carefully about the user's messages
2. Decide whether to update long-term memory:
   - Personal info → UpdateMemory tool with type `user`
   - Tasks → UpdateMemory tool with type `todo`
   - Preferences → UpdateMemory tool with type `instructions`
3. Tell the user about todo updates (not profile or instructions)
4. Err on the side of updating the todo list
5. Respond naturally to the user"""

TRUSTCALL_INSTRUCTION = """Reflect on following interaction.

Use the provided tools to retain any necessary memories about the user.

Use parallel tool calling to handle updates and insertions simultaneously.

System Time: {time}"""

CREATE_INSTRUCTIONS = """Reflect on the following interaction.

Based on this interaction, update your instructions for how to update ToDo list items.

Use any feedback from the user to update how they like to have items added.

Your current instructions are:
<current_instructions>
{current_instructions}
</current_instructions>"""

文档版本:1.0 最后更新:2024-11-04 作者:AI Assistant 基于:LangChain Academy Module-5 Lesson 5.1

希望这份详细解读能帮助你深入理解 LangGraph 的 Memory Agent 系统!

基于 MIT 许可证发布。内容版权归作者所有。