Skip to content

5.4 Memory Schema - Collection - 详细解读

一、概述

1.1 本节简介

本节是 LangChain Academy Module-5 的第四部分,也是最后一部分,主要内容是将记忆从单一 Profile(用户资料) 转变为 Collection(记忆集合),实现更灵活和可扩展的记忆管理系统。

1.2 Profile vs Collection:核心区别

这是理解本节的关键!让我们先明确这两种记忆模式的本质区别:

Profile(单一对象)

5.3 节的 Profile 模式

python
# 单一的用户资料对象
{
    "user_name": "Lance",
    "user_location": "San Francisco",
    "interests": ["biking", "bakeries"]
}

特点

  • ✅ 固定结构
  • ✅ 字段明确
  • ✅ 更新时修改特定字段
  • ❌ 不适合开放式信息收集
  • ❌ 难以添加新类型的信息

类比:就像一张固定格式的表单。

Collection(记忆集合)

5.4 节的 Collection 模式

python
# 多个独立的记忆项
[
    {"content": "User's name is Lance."},
    {"content": "Lance likes to bike around San Francisco."},
    {"content": "Lance enjoys going to bakeries."}
]

特点

  • ✅ 灵活结构
  • ✅ 可以无限扩展
  • ✅ 每个记忆独立存储
  • ✅ 可以添加任何类型的新记忆
  • ✅ 支持并行更新和插入

类比:就像一个笔记本,可以不断添加新笔记。

1.3 直观对比

Profile 模式(5.3)              Collection 模式(5.4)
    ↓                               ↓
┌────────────────┐            ┌────────────────┐
│  用户资料      │            │  记忆 1        │
│  ┌──────────┐  │            │  content: ...  │
│  │ name     │  │            └────────────────┘
│  │ location │  │            ┌────────────────┐
│  │ interests│  │            │  记忆 2        │
│  └──────────┘  │            │  content: ...  │
│  单一对象      │            └────────────────┘
└────────────────┘            ┌────────────────┐
                              │  记忆 3        │
更新 = 修改字段                │  content: ...  │
                              └────────────────┘
                              多个独立对象

                              更新 = 修改现有项
                              插入 = 添加新项

1.4 学习目标

通过学习本节内容,你将掌握:

  1. Collection Schema 的定义和使用
  2. enable_inserts=True 参数的作用
  3. 如何使用 UUID 管理多个记忆项
  4. Trustcall 如何同时处理更新和插入
  5. 何时使用 Profile,何时使用 Collection

1.5 应用场景

Collection 适合的场景

场景为什么用 Collection
学习笔记可以不断添加新知识点
对话记录保存重要的对话片段
用户反馈收集多条反馈意见
待办事项添加和管理多个任务
开放式问答记录用户的各种问题

Profile 适合的场景

场景为什么用 Profile
用户资料固定字段(姓名、年龄等)
系统配置结构化的设置项
认证信息明确的凭证字段

二、Collection Schema 基础

2.1 定义 Memory Schema

首先定义单个记忆的结构:

python
from pydantic import BaseModel, Field

class Memory(BaseModel):
    content: str = Field(
        description="The main content of the memory. For example: User expressed interest in learning about French."
    )

设计特点

  1. 极简设计

    • 只有一个字段:content
    • 没有预定义的类别或标签
    • 最大化灵活性
  2. 描述性指引

    • description 字段告诉 LLM 如何使用
    • 提供示例帮助理解
  3. 开放式内容

    • 可以存储任何类型的信息
    • 没有结构限制

2.2 定义 MemoryCollection Schema

定义记忆集合(用于批量提取):

python
class MemoryCollection(BaseModel):
    memories: list[Memory] = Field(
        description="A list of memories about the user."
    )

用途

  • 用于 with_structured_output() 一次性提取多条记忆
  • 模型会返回一个包含多个 Memory 的列表

Python 知识点

类型注解 list[Memory]

python
# Python 3.9+
memories: list[Memory]

# Python 3.8 及更早版本
from typing import List
memories: List[Memory]

2.3 为什么这样设计?

简单但强大的理由

  1. 灵活性最大化

    python
    # 可以存储任何类型的信息
    Memory(content="User's name is Lance.")
    Memory(content="Lance likes biking.")
    Memory(content="Lance visited Paris in 2023.")
    Memory(content="Lance prefers dark mode in apps.")
  2. 易于扩展

    python
    # 如果将来需要更多字段,可以扩展
    class Memory(BaseModel):
        content: str
        category: Optional[str] = None      # 可选的类别
        importance: Optional[int] = None    # 可选的重要性评分
        timestamp: datetime = Field(default_factory=datetime.now)
  3. 与 Profile 互补

    python
    # Profile: 结构化的固定信息
    profile = {
        "user_name": "Lance",
        "user_location": "San Francisco"
    }
    
    # Collection: 灵活的开放式信息
    memories = [
        {"content": "Lance likes biking."},
        {"content": "Lance visited Tartine bakery."},
        {"content": "Lance is planning a trip to Japan."}
    ]

三、使用 with_structured_output 提取 Collection

3.1 基础提取

python
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI

# 初始化模型
model = ChatOpenAI(model="gpt-4o", temperature=0)

# 绑定 Schema 到模型
model_with_structure = model.with_structured_output(MemoryCollection)

# 调用模型提取记忆
memory_collection = model_with_structure.invoke([
    HumanMessage("My name is Lance. I like to bike.")
])

# 查看结果
print(memory_collection.memories)

输出

python
[
    Memory(content="User's name is Lance."),
    Memory(content='Lance likes to bike.')
]

分析

  • 模型自动将一句话拆分成两条独立的记忆
  • 每条记忆都是一个 Memory 对象
  • 符合 MemoryCollection 的结构

3.2 序列化为字典

python
# 单个记忆
memory = memory_collection.memories[0]
print(memory.model_dump())
# 输出:{'content': "User's name is Lance."}

# 所有记忆
for mem in memory_collection.memories:
    print(mem.model_dump())

model_dump() 的作用

  • 将 Pydantic 模型转换为 Python 字典
  • 便于存储到 Store 或数据库
  • 可以序列化为 JSON

3.3 保存到 Store

Collection 的每个记忆项都独立保存,使用唯一的 UUID 作为键:

python
import uuid
from langgraph.store.memory import InMemoryStore

# 初始化 Store
in_memory_store = InMemoryStore()

# 定义命名空间
user_id = "1"
namespace_for_memory = (user_id, "memories")

# 保存第一条记忆
key = str(uuid.uuid4())  # 生成唯一 ID
value = memory_collection.memories[0].model_dump()
in_memory_store.put(namespace_for_memory, key, value)

# 保存第二条记忆
key = str(uuid.uuid4())  # 另一个唯一 ID
value = memory_collection.memories[1].model_dump()
in_memory_store.put(namespace_for_memory, key, value)

为什么使用 UUID?

使用固定键使用 UUID
只能有一条记忆可以有无限条记忆
新记忆会覆盖旧记忆每条记忆独立存储
适合 Profile适合 Collection

存储结构示意

Store
└── ("1", "memories")  ← namespace
    ├── "e1c4e5ab-..." → {"content": "User's name is Lance."}
    └── "e132a1ea-..." → {"content": "Lance likes to bike."}

3.4 搜索记忆

python
# 搜索命名空间中的所有记忆
for m in in_memory_store.search(namespace_for_memory):
    print(m.dict())

输出

python
{
    'value': {'content': "User's name is Lance."},
    'key': 'e1c4e5ab-ab0f-4cbb-822d-f29240a983af',
    'namespace': ['1', 'memories'],
    'created_at': '2024-10-30T21:43:26.893775+00:00',
    'updated_at': '2024-10-30T21:43:26.893779+00:00'
}
{
    'value': {'content': 'Lance likes to bike.'},
    'key': 'e132a1ea-6202-43ac-a9a6-3ecf2c1780a8',
    'namespace': ['1', 'memories'],
    'created_at': '2024-10-30T21:43:26.893833+00:00',
    'updated_at': '2024-10-30T21:43:26.893834+00:00'
}

四、Trustcall 的 enable_inserts 参数

4.1 为什么需要 enable_inserts?

在 5.3 节中,Trustcall 默认只更新单一对象

python
# 5.3 的用法(Profile)
extractor = create_extractor(
    model,
    tools=[UserProfile],
    tool_choice="UserProfile"
    # 没有 enable_inserts
)

对于 Collection,我们需要添加新项更新现有项

python
# 5.4 的用法(Collection)
extractor = create_extractor(
    model,
    tools=[Memory],
    tool_choice="Memory",
    enable_inserts=True  # 关键!
)

4.2 enable_inserts 的作用

enable_inserts=True 告诉 Trustcall

  1. ✅ 可以创建新的记忆项
  2. ✅ 可以更新现有的记忆项
  3. ✅ 可以同时进行创建和更新(并行操作)

没有 enable_inserts

  • 只能更新现有项
  • 不能添加新项
  • 适合 Profile 场景

有 enable_inserts

  • 可以添加新项
  • 也可以更新现有项
  • 适合 Collection 场景

4.3 创建 Trustcall Extractor

python
from trustcall import create_extractor

# 定义 Memory Schema
class Memory(BaseModel):
    content: str = Field(
        description="The main content of the memory. For example: User expressed interest in learning about French."
    )

# 创建提取器
trustcall_extractor = create_extractor(
    model,
    tools=[Memory],
    tool_choice="Memory",
    enable_inserts=True,  # 启用插入功能
)

五、Trustcall 处理 Collection

5.1 基础提取(创建新记忆)

python
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

# 提取指令
instruction = """Extract memories from the following conversation:"""

# 对话
conversation = [
    HumanMessage(content="Hi, I'm Lance."),
    AIMessage(content="Nice to meet you, Lance."),
    HumanMessage(content="This morning I had a nice bike ride in San Francisco.")
]

# 调用提取器
result = trustcall_extractor.invoke({
    "messages": [SystemMessage(content=instruction)] + conversation
})

查看消息(工具调用)

python
for m in result["messages"]:
    m.pretty_print()

输出

================================== Ai Message ==================================
Tool Calls:
  Memory (call_Pj4kctFlpg9TgcMBfMH33N30)
 Call ID: call_Pj4kctFlpg9TgcMBfMH33N30
  Args:
    content: Lance had a nice bike ride in San Francisco this morning.

查看响应(解析后的对象)

python
for m in result["responses"]:
    print(m)

输出

content='Lance had a nice bike ride in San Francisco this morning.'

查看元数据

python
for m in result["response_metadata"]:
    print(m)

输出

python
{'id': 'call_Pj4kctFlpg9TgcMBfMH33N30'}

5.2 更新和插入(核心功能)

现在让我们看看 Trustcall 如何同时处理更新现有记忆插入新记忆

5.2.1 准备现有记忆

python
# 更新后的对话
updated_conversation = [
    AIMessage(content="That's great, did you do after?"),
    HumanMessage(content="I went to Tartine and ate a croissant."),
    AIMessage(content="What else is on your mind?"),
    HumanMessage(content="I was thinking about my Japan, and going back this winter!"),
]

# 更新指令
system_msg = """Update existing memories and create new ones based on the following conversation:"""

# 准备现有记忆
tool_name = "Memory"
existing_memories = [
    (str(i), tool_name, memory.model_dump())
    for i, memory in enumerate(result["responses"])
] if result["responses"] else None

print(existing_memories)

输出

python
[
    (
        '0',                           # ID
        'Memory',                      # 工具名
        {'content': 'Lance had a nice bike ride in San Francisco this morning.'}  # 数据
    )
]

数据格式说明

Trustcall 期望现有数据的格式:

python
[
    (id, tool_name, data),
    (id, tool_name, data),
    ...
]
  • id:字符串,标识记忆项(用于更新)
  • tool_name:工具名称("Memory")
  • data:字典格式的数据

Python 知识点

列表推导式的完整形式

python
existing_memories = [
    (str(i), tool_name, memory.model_dump())
    for i, memory in enumerate(result["responses"])
] if result["responses"] else None

# 展开等价于:
if result["responses"]:
    existing_memories = []
    for i, memory in enumerate(result["responses"]):
        existing_memories.append((str(i), tool_name, memory.model_dump()))
else:
    existing_memories = None

5.2.2 调用 Trustcall 进行更新和插入

python
# 调用提取器
result = trustcall_extractor.invoke({
    "messages": updated_conversation,
    "existing": existing_memories
})

查看工具调用

python
for m in result["messages"]:
    m.pretty_print()

输出

================================== Ai Message ==================================
Tool Calls:
  Memory (call_vxks0YH1hwUxkghv4f5zdkTr)
 Call ID: call_vxks0YH1hwUxkghv4f5zdkTr
  Args:
    content: Lance had a nice bike ride in San Francisco this morning. He went to Tartine and ate a croissant. He was thinking about his trip to Japan and going back this winter!
  Memory (call_Y4S3poQgFmDfPy2ExPaMRk8g)
 Call ID: call_Y4S3poQgFmDfPy2ExPaMRk8g
  Args:
    content: Lance went to Tartine and ate a croissant. He was thinking about his trip to Japan and going back this winter!

关键观察

  • 有两个 Memory 工具调用
  • 第一个包含了所有信息(更新)
  • 第二个只包含新信息(插入)

查看响应

python
for m in result["responses"]:
    print(m)

输出

content='Lance had a nice bike ride in San Francisco this morning. He went to Tartine and ate a croissant. He was thinking about his trip to Japan and going back this winter!'
content='Lance went to Tartine and ate a croissant. He was thinking about his trip to Japan and going back this winter!'

查看元数据(关键)

python
for m in result["response_metadata"]:
    print(m)

输出

python
{'id': 'call_vxks0YH1hwUxkghv4f5zdkTr', 'json_doc_id': '0'}
{'id': 'call_Y4S3poQgFmDfPy2ExPaMRk8g'}

重点分析

响应元数据含义
第一个'json_doc_id': '0'更新现有记忆(ID 为 '0')
第二个没有 json_doc_id插入新记忆

5.3 Trustcall 的智能决策

Trustcall 自动决定哪些记忆应该更新,哪些应该插入:

原始记忆:
- Lance had a nice bike ride in San Francisco this morning.

新对话:
- I went to Tartine and ate a croissant.
- I was thinking about Japan, and going back this winter!

Trustcall 的决策:
┌─────────────────────────────────────────────────┐
│ 更新记忆 0:                                     │
│ "Lance had a nice bike ride... He went to       │
│  Tartine... He was thinking about his trip..."   │
│ (整合新旧信息)                                 │
└─────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────┐
│ 插入新记忆:                                     │
│ "Lance went to Tartine... He was thinking       │
│  about his trip to Japan..."                     │
│ (独立的新记忆)                                 │
└─────────────────────────────────────────────────┘

为什么这样设计?

  1. 信息整合:相关的信息更新到同一条记忆中
  2. 独立性:不相关或独立的事件创建新记忆
  3. 避免冗余:不会创建完全重复的记忆
  4. 保持灵活性:根据内容自动决策

六、构建带有 Collection Schema 的聊天机器人

6.1 完整的实现

python
from IPython.display import Image, display
import uuid
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.memory import InMemoryStore
from langchain_core.messages import merge_message_runs, HumanMessage, SystemMessage
from langchain_core.runnables.config import RunnableConfig
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore
from langchain_openai import ChatOpenAI
from trustcall import create_extractor
from pydantic import BaseModel, Field

# 初始化模型
model = ChatOpenAI(model="gpt-4o", temperature=0)

# Memory Schema
class Memory(BaseModel):
    content: str = Field(
        description="The main content of the memory. For example: User expressed interest in learning about French."
    )

# 创建 Trustcall 提取器
trustcall_extractor = create_extractor(
    model,
    tools=[Memory],
    tool_choice="Memory",
    enable_inserts=True,  # 允许插入新记忆
)

# 系统提示词
MODEL_SYSTEM_MESSAGE = """You are a helpful chatbot. You are designed to be a companion to a user.

You have a long term memory which keeps track of information you learn about the user over time.

Current Memory (may include updated memories from this conversation):

{memory}"""

TRUSTCALL_INSTRUCTION = """Reflect on following interaction.

Use the provided tools to retain any necessary memories about the user.

Use parallel tool calling to handle updates and insertions simultaneously:"""

6.2 call_model 节点

python
def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """从 store 加载记忆并用它们来个性化聊天机器人的响应"""

    # 获取用户 ID
    user_id = config["configurable"]["user_id"]

    # 从 store 检索记忆
    namespace = ("memories", user_id)
    memories = store.search(namespace)

    # 格式化记忆为系统提示
    info = "\n".join(f"- {mem.value['content']}" for mem in memories)
    system_msg = MODEL_SYSTEM_MESSAGE.format(memory=info)

    # 使用记忆和聊天历史生成响应
    response = model.invoke([
        SystemMessage(content=system_msg)
    ] + state["messages"])

    return {"messages": response}

代码分析

第 7-8 行:检索所有记忆

python
namespace = ("memories", user_id)
memories = store.search(namespace)
  • 使用 search() 而不是 get()
  • 因为 Collection 有多个记忆项
  • 返回该命名空间下的所有记忆

第 10-11 行:格式化记忆

python
info = "\n".join(f"- {mem.value['content']}" for mem in memories)
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=info)

格式化示例

python
# memories = [
#     Item(value={'content': "User's name is Lance."}),
#     Item(value={'content': 'Lance likes to bike around San Francisco.'})
# ]

info = """
- User's name is Lance.
- Lance likes to bike around San Francisco.
"""

生成器表达式详解

python
"\n".join(f"- {mem.value['content']}" for mem in memories)

# 等价于:
result = []
for mem in memories:
    result.append(f"- {mem.value['content']}")
info = "\n".join(result)

6.3 write_memory 节点

python
def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """反思聊天历史并更新记忆集合"""

    # 获取用户 ID
    user_id = config["configurable"]["user_id"]

    # 定义命名空间
    namespace = ("memories", user_id)

    # 检索最近的记忆作为上下文
    existing_items = store.search(namespace)

    # 为 Trustcall 提取器格式化现有记忆
    tool_name = "Memory"
    existing_memories = (
        [(existing_item.key, tool_name, existing_item.value)
         for existing_item in existing_items]
        if existing_items
        else None
    )

    # 合并聊天历史和指令
    updated_messages = list(merge_message_runs(
        messages=[SystemMessage(content=TRUSTCALL_INSTRUCTION)] + state["messages"]
    ))

    # 调用提取器
    result = trustcall_extractor.invoke({
        "messages": updated_messages,
        "existing": existing_memories
    })

    # 保存 Trustcall 的记忆到 store
    for r, rmeta in zip(result["responses"], result["response_metadata"]):
        store.put(
            namespace,
            rmeta.get("json_doc_id", str(uuid.uuid4())),  # 关键!
            r.model_dump(mode="json"),
        )

代码逐行解析

第 11 行:检索现有记忆

python
existing_items = store.search(namespace)
  • 获取所有现有记忆项
  • 用于 Trustcall 的上下文

第 14-20 行:格式化为 Trustcall 格式

python
tool_name = "Memory"
existing_memories = (
    [(existing_item.key, tool_name, existing_item.value)
     for existing_item in existing_items]
    if existing_items
    else None
)

格式化结果示例

python
[
    ('dee65880-...', 'Memory', {'content': "User's name is Lance."}),
    ('662195fc-...', 'Memory', {'content': 'Lance likes to bike...'})
]

第 22-25 行:合并消息

python
updated_messages = list(merge_message_runs(
    messages=[SystemMessage(content=TRUSTCALL_INSTRUCTION)] + state["messages"]
))

merge_message_runs() 的作用

python
# 合并前:
[
    HumanMessage("Hi"),
    HumanMessage("How are you?"),
    AIMessage("I'm good"),
    AIMessage("Thanks for asking")
]

# 合并后:
[
    HumanMessage("Hi\nHow are you?"),
    AIMessage("I'm good\nThanks for asking")
]
  • 合并连续的相同角色消息
  • 减少消息数量
  • 提高效率

第 28-32 行:调用 Trustcall

python
result = trustcall_extractor.invoke({
    "messages": updated_messages,
    "existing": existing_memories
})

第 34-39 行:保存结果(关键逻辑)

python
for r, rmeta in zip(result["responses"], result["response_metadata"]):
    store.put(
        namespace,
        rmeta.get("json_doc_id", str(uuid.uuid4())),
        r.model_dump(mode="json"),
    )

核心逻辑解析

python
rmeta.get("json_doc_id", str(uuid.uuid4()))

这行代码决定了是更新还是插入

情况rmeta 内容使用的键操作
更新现有记忆{'id': '...', 'json_doc_id': '0'}现有的键(从 existing_memories更新
插入新记忆{'id': '...'}新的 UUID插入

示例

python
# 情况 1:更新
rmeta = {'id': 'call_xxx', 'json_doc_id': 'dee65880-...'}
key = rmeta.get("json_doc_id", str(uuid.uuid4()))
# key = 'dee65880-...'(现有键)
# → 更新现有记忆

# 情况 2:插入
rmeta = {'id': 'call_yyy'}
key = rmeta.get("json_doc_id", str(uuid.uuid4()))
# key = '新的UUID'(例如 'f1234567-...')
# → 插入新记忆

Python 知识点

zip() 函数

python
responses = [mem1, mem2]
metadata = [meta1, meta2]

for r, rmeta in zip(responses, metadata):
    # 第一次迭代:r=mem1, rmeta=meta1
    # 第二次迭代:r=mem2, rmeta=meta2
    process(r, rmeta)

.get() 方法的默认值

python
d = {'a': 1}
d.get('a', 0)  # 返回 1(键存在)
d.get('b', 0)  # 返回 0(键不存在,使用默认值)

6.4 构建图

python
# 定义图
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)

# 长期记忆(跨线程)
across_thread_memory = InMemoryStore()

# 短期记忆(线程内)
within_thread_memory = MemorySaver()

# 编译图
graph = builder.compile(
    checkpointer=within_thread_memory,
    store=across_thread_memory
)

图结构

START → call_model → write_memory → END

七、实战演示

7.1 第一次交互:创建第一条记忆

python
# 配置
config = {
    "configurable": {
        "thread_id": "1",
        "user_id": "1"
    }
}

# 用户输入
input_messages = [HumanMessage(content="Hi, my name is Lance")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

输出

================================ Human Message =================================
Hi, my name is Lance

================================== Ai Message ==================================
Hi Lance! It's great to meet you. How can I assist you today?

7.2 第二次交互:添加新记忆

python
# 用户输入
input_messages = [HumanMessage(content="I like to bike around San Francisco")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

输出

================================ Human Message =================================
I like to bike around San Francisco

================================== Ai Message ==================================
That sounds like a lot of fun! San Francisco has some beautiful routes for biking.
Do you have a favorite trail or area you like to explore?

7.3 查看记忆集合

python
# 定义命名空间
user_id = "1"
namespace = ("memories", user_id)

# 搜索所有记忆
memories = across_thread_memory.search(namespace)

# 打印每条记忆
for m in memories:
    print(m.dict())

输出

python
{
    'value': {'content': "User's name is Lance."},
    'key': 'dee65880-dd7d-4184-8ca1-1f7400f7596b',
    'namespace': ['memories', '1'],
    'created_at': '2024-10-30T22:18:52.413283+00:00',
    'updated_at': '2024-10-30T22:18:52.413284+00:00'
}
{
    'value': {'content': 'User likes to bike around San Francisco.'},
    'key': '662195fc-8ea4-4f64-a6b6-6b86d9cb85c0',
    'namespace': ['memories', '1'],
    'created_at': '2024-10-30T22:18:56.597813+00:00',
    'updated_at': '2024-10-30T22:18:56.597814+00:00'
}

分析

  • 有两条独立的记忆
  • 每条记忆有唯一的 UUID 键
  • 分别记录了名字和爱好

7.4 继续添加记忆

python
# 用户输入
input_messages = [HumanMessage(content="I also enjoy going to bakeries")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

输出

================================ Human Message =================================
I also enjoy going to bakeries

================================== Ai Message ==================================
Biking and bakeries make a great combination! Do you have a favorite bakery in
San Francisco, or are you on the hunt for new ones to try?

现在应该有三条记忆

  1. User's name is Lance.
  2. Lance likes to bike around San Francisco.
  3. Lance enjoys going to bakeries.

7.5 跨会话测试

python
# 新的线程,相同的用户
config = {
    "configurable": {
        "thread_id": "2",  # 新会话
        "user_id": "1"     # 相同用户
    }
}

# 用户输入
input_messages = [HumanMessage(content="What bakeries do you recommend for me?")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

输出

================================ Human Message =================================
What bakeries do you recommend for me?

================================== Ai Message ==================================
Since you enjoy biking around San Francisco, you might like to check out some of
these bakeries that are both delicious and located in areas that are great for a
bike ride:

1. **Tartine Bakery** - Located in the Mission District, it's famous for its
   bread and pastries. The area is vibrant and perfect for a leisurely ride.

2. **Arsicault Bakery** - Known for its incredible croissants, it's in the
   Richmond District, which offers a nice ride through Golden Gate Park.

3. **B. Patisserie** - Situated in Lower Pacific Heights, this bakery is renowned
   for its kouign-amann and other French pastries. The neighborhood is charming
   and bike-friendly.

4. **Mr. Holmes Bakehouse** - Famous for its cruffins, it's located in the
   Tenderloin, which is a bit more urban but still accessible by bike.

5. **Noe Valley Bakery** - A cozy spot in Noe Valley, perfect for a stop after
   exploring the hilly streets of the area.

Do any of these sound like a good fit for your next biking adventure?

Collection 的威力

  1. ✅ AI 记得所有记忆(名字、骑车、面包店)
  2. ✅ 结合多条记忆生成个性化建议
  3. ✅ "Since you enjoy biking"(来自记忆 2)
  4. ✅ 推荐适合骑车去的面包店(结合记忆 2 和 3)

八、Profile vs Collection 深入对比

8.1 数据结构对比

Profile(5.3)

python
# 单一对象
{
    "user_name": "Lance",
    "user_location": "San Francisco",
    "interests": ["biking", "bakeries"]
}

# 存储方式
namespace = ("memory", user_id)
key = "user_memory"  # 固定键
store.put(namespace, key, profile_data)

Collection(5.4)

python
# 多个独立对象
[
    {"content": "User's name is Lance."},
    {"content": "Lance likes biking."},
    {"content": "Lance enjoys bakeries."}
]

# 存储方式
namespace = ("memories", user_id)
for memory in memories:
    key = str(uuid.uuid4())  # 每个记忆有唯一键
    store.put(namespace, key, memory)

8.2 更新操作对比

Profile 更新

python
# 整个对象被替换或字段被更新
old_profile = {
    "user_name": "Lance",
    "interests": ["biking"]
}

new_profile = {
    "user_name": "Lance",
    "interests": ["biking", "bakeries"]  # 字段更新
}

# 覆盖整个 Profile
store.put(namespace, "user_memory", new_profile)

Collection 更新

python
# 可以更新特定记忆,也可以添加新记忆
# 更新记忆 1
store.put(namespace, "existing-uuid-1", {
    "content": "Updated content"
})

# 添加新记忆
store.put(namespace, str(uuid.uuid4()), {
    "content": "New memory content"
})

# 现有记忆保持不变

8.3 检索方式对比

Profile 检索

python
# 使用 get() 获取单一对象
profile = store.get(namespace, "user_memory")
if profile:
    user_name = profile.value.get("user_name")
    interests = profile.value.get("interests")

Collection 检索

python
# 使用 search() 获取所有记忆
memories = store.search(namespace)

# 遍历所有记忆
for memory in memories:
    content = memory.value['content']
    print(content)

# 或者格式化
info = "\n".join(f"- {mem.value['content']}" for mem in memories)

8.4 Trustcall 参数对比

参数Profile (5.3)Collection (5.4)
tools[UserProfile][Memory]
tool_choice"UserProfile""Memory"
enable_inserts❌ 不需要必须True
existing 格式{"UserProfile": data}[(id, "Memory", data), ...]

8.5 使用场景对比

场景推荐方案原因
用户资料Profile固定字段,结构化
待办事项Collection多个独立任务
系统设置Profile明确的配置项
学习笔记Collection开放式内容
对话记录Collection持续增长
认证信息Profile固定凭证

8.6 性能和效率对比

方面ProfileCollection
存储效率✅ 单一对象⚠️ 多个对象
更新效率⚠️ 可能更新整个对象✅ 只更新变化的项
查询效率✅ 直接 get⚠️ 需要 search 遍历
扩展性❌ 受字段限制✅ 无限扩展
Token 使用⚠️ 每次传递全部✅ 可以选择性传递

九、高级主题

9.1 选择性记忆加载

当记忆集合很大时,不需要每次都加载所有记忆:

python
def call_model_with_filtering(state, config, store):
    user_id = config["configurable"]["user_id"]
    namespace = ("memories", user_id)

    # 获取所有记忆
    all_memories = store.search(namespace)

    # 选择最近的 N 条记忆
    recent_memories = sorted(
        all_memories,
        key=lambda m: m.updated_at,
        reverse=True
    )[:10]  # 只使用最近 10 条

    # 或者根据相关性筛选
    # relevant_memories = filter_by_relevance(
    #     memories=all_memories,
    #     current_query=state["messages"][-1].content
    # )

    # 格式化记忆
    info = "\n".join(f"- {mem.value['content']}" for mem in recent_memories)
    system_msg = MODEL_SYSTEM_MESSAGE.format(memory=info)

    response = model.invoke([SystemMessage(content=system_msg)] + state["messages"])
    return {"messages": response}

优化策略

  1. 时间排序:使用最新的记忆
  2. 相关性筛选:只使用与当前对话相关的记忆
  3. 重要性评分:为记忆添加重要性字段
  4. 分类过滤:根据记忆类别筛选

9.2 记忆的类别化

扩展 Memory Schema 以支持分类:

python
from typing import Optional

class Memory(BaseModel):
    content: str = Field(description="记忆的主要内容")
    category: Optional[str] = Field(
        default=None,
        description="记忆类别:personal, preference, event, skill 等"
    )
    importance: Optional[int] = Field(
        default=5,
        ge=1,
        le=10,
        description="重要性评分(1-10)"
    )

使用示例

python
memories = [
    Memory(content="User's name is Lance", category="personal", importance=10),
    Memory(content="Lance likes biking", category="preference", importance=8),
    Memory(content="Lance visited Tartine", category="event", importance=5)
]

筛选特定类别

python
def get_memories_by_category(store, user_id, category):
    namespace = ("memories", user_id)
    all_memories = store.search(namespace)

    return [
        mem for mem in all_memories
        if mem.value.get('category') == category
    ]

# 只获取偏好信息
preferences = get_memories_by_category(store, "1", "preference")

9.3 记忆的合并和去重

防止记忆过度冗余:

python
def deduplicate_memories(store, user_id):
    """去除重复或相似的记忆"""
    namespace = ("memories", user_id)
    memories = store.search(namespace)

    # 使用 LLM 识别重复
    # (简化示例,实际应该使用更复杂的逻辑)
    unique_memories = {}
    for mem in memories:
        content = mem.value['content'].lower()
        # 简单的相似度检查
        is_duplicate = False
        for existing_content in unique_memories.keys():
            if content in existing_content or existing_content in content:
                is_duplicate = True
                break

        if not is_duplicate:
            unique_memories[content] = mem

    return list(unique_memories.values())

9.4 记忆的自动摘要

当记忆过多时,自动生成摘要:

python
def summarize_memories(model, memories):
    """将多条记忆压缩为摘要"""
    content_list = [mem.value['content'] for mem in memories]
    all_content = "\n".join(f"- {c}" for c in content_list)

    prompt = f"""Please create a concise summary of the following memories:

{all_content}

Summary:"""

    summary = model.invoke([HumanMessage(content=prompt)])
    return summary.content

十、实践建议

10.1 何时使用 Collection

✅ 适合 Collection 的场景

  1. 信息类型不固定

    python
    # 用户可能分享任何类型的信息
    memories = [
        "Lance likes coffee",
        "Lance visited Paris in 2020",
        "Lance is learning French",
        "Lance prefers morning meetings"
    ]
  2. 需要持续增长

    python
    # 随着时间推移不断添加新记忆
    day_1: ["Lance likes biking"]
    day_2: ["Lance likes biking", "Lance tried Tartine"]
    day_3: ["Lance likes biking", "Lance tried Tartine", "Lance planning Japan trip"]
  3. 需要独立管理每个项目

    python
    # 每条记忆可以独立更新或删除
    store.delete(namespace, memory_id_1)  # 删除特定记忆
    store.put(namespace, memory_id_2, updated_content)  # 更新特定记忆
  4. 类似日志或事件流

    python
    # 记录用户的活动流
    "User logged in"
    "User completed tutorial"
    "User made first purchase"

10.2 Profile 和 Collection 的结合使用

最佳实践:同时使用两者

python
# Profile:结构化的基本信息
profile_namespace = ("profile", user_id)
profile = {
    "user_name": "Lance",
    "user_location": "San Francisco",
    "created_at": "2024-01-01"
}
store.put(profile_namespace, "user_profile", profile)

# Collection:灵活的补充信息
collection_namespace = ("memories", user_id)
memories = [
    {"content": "Lance likes biking"},
    {"content": "Lance visited Tartine bakery"},
    {"content": "Lance is planning a Japan trip"}
]
for memory in memories:
    store.put(collection_namespace, str(uuid.uuid4()), memory)

在聊天机器人中使用

python
def call_model(state, config, store):
    user_id = config["configurable"]["user_id"]

    # 加载 Profile
    profile = store.get(("profile", user_id), "user_profile")
    profile_info = f"Name: {profile.value['user_name']}, Location: {profile.value['user_location']}"

    # 加载 Collection
    memories = store.search(("memories", user_id))
    memories_info = "\n".join(f"- {m.value['content']}" for m in memories)

    # 组合信息
    system_msg = f"""You are a helpful assistant.

User Profile:
{profile_info}

Additional Memories:
{memories_info}"""

    response = model.invoke([SystemMessage(content=system_msg)] + state["messages"])
    return {"messages": response}

10.3 记忆管理最佳实践

1. 定期清理

python
def cleanup_old_memories(store, user_id, days=90):
    """删除超过 N 天的旧记忆"""
    from datetime import datetime, timedelta

    namespace = ("memories", user_id)
    memories = store.search(namespace)

    cutoff_date = datetime.now() - timedelta(days=days)

    for mem in memories:
        if datetime.fromisoformat(mem.created_at) < cutoff_date:
            store.delete(namespace, mem.key)

2. 限制记忆数量

python
MAX_MEMORIES = 100

def add_memory_with_limit(store, user_id, new_memory):
    namespace = ("memories", user_id)
    memories = store.search(namespace)

    if len(memories) >= MAX_MEMORIES:
        # 删除最旧的记忆
        oldest = min(memories, key=lambda m: m.created_at)
        store.delete(namespace, oldest.key)

    # 添加新记忆
    store.put(namespace, str(uuid.uuid4()), new_memory)

3. 记忆质量控制

python
def is_valuable_memory(content: str) -> bool:
    """判断记忆是否有价值"""
    # 过滤太短的记忆
    if len(content) < 10:
        return False

    # 过滤无意义的记忆
    useless_phrases = ["hello", "hi", "ok", "thanks", "bye"]
    if content.lower() in useless_phrases:
        return False

    return True

# 在 write_memory 中使用
for r in result["responses"]:
    if is_valuable_memory(r.content):
        store.put(namespace, str(uuid.uuid4()), r.model_dump())

十一、常见问题和解决方案

11.1 记忆过多导致上下文过长

问题

python
# 有 1000 条记忆
memories = store.search(namespace)
# len(memories) = 1000

# 全部传给 LLM 会超出上下文限制

解决方案

方案 1:限制数量

python
# 只使用最近的 N 条
recent_memories = sorted(memories, key=lambda m: m.updated_at, reverse=True)[:20]

方案 2:语义搜索

python
# 使用向量数据库查找相关记忆
from langchain.vectorstores import Chroma

# 创建向量存储
vectorstore = Chroma.from_texts(
    [m.value['content'] for m in memories],
    embeddings
)

# 查询相关记忆
current_query = state["messages"][-1].content
relevant_memories = vectorstore.similarity_search(current_query, k=10)

方案 3:分层摘要

python
# 对旧记忆进行摘要
old_memories_summary = summarize_memories(model, old_memories)
recent_memories_full = [m.value['content'] for m in recent_memories]

info = f"""Summary of older memories:
{old_memories_summary}

Recent memories:
{'\n'.join(recent_memories_full)}"""

11.2 重复记忆

问题

python
# 用户多次提到同一件事
memories = [
    "Lance likes biking",
    "Lance enjoys biking",
    "Lance loves to bike"
]

解决方案

在 Trustcall 指令中明确要求

python
TRUSTCALL_INSTRUCTION = """Reflect on the following interaction.

Create or update memories, avoiding duplicates.

If a memory is similar to an existing one, UPDATE the existing memory instead of creating a new one.

Use parallel tool calling to handle updates and insertions simultaneously."""

或者定期去重

python
def merge_similar_memories(model, store, user_id):
    namespace = ("memories", user_id)
    memories = store.search(namespace)

    # 使用 LLM 识别相似记忆
    content_list = [m.value['content'] for m in memories]

    prompt = f"""Identify groups of similar memories that should be merged:

{'\n'.join(f"{i}. {c}" for i, c in enumerate(content_list))}

Output groups of similar memory indices."""

    result = model.invoke([HumanMessage(content=prompt)])
    # 处理结果并合并...

11.3 记忆更新失败

问题

python
# Trustcall 没有正确识别应该更新的记忆
# 导致创建了新记忆而不是更新现有记忆

解决方案

确保 existing_memories 格式正确

python
# 正确格式
existing_memories = [
    (mem.key, "Memory", mem.value)  # 使用 mem.key,不是 str(i)
    for mem in store.search(namespace)
]

# 错误格式(可能导致更新失败)
existing_memories = [
    (str(i), "Memory", mem.value)  # 使用索引而不是实际的 key
    for i, mem in enumerate(...)
]

提供更明确的更新指令

python
TRUSTCALL_INSTRUCTION = """Review the existing memories and the new conversation.

For each piece of information:
1. If it's related to an existing memory, UPDATE that memory
2. If it's completely new information, CREATE a new memory

Prefer updating existing memories over creating new ones when possible."""

十二、性能优化

12.1 批量操作

问题:逐个保存记忆效率低

优化

python
# 不好的做法
for memory in memories:
    store.put(namespace, str(uuid.uuid4()), memory)

# 更好的做法(如果 Store 支持)
# 使用批量 API
items = [
    (namespace, str(uuid.uuid4()), mem.model_dump())
    for mem in result["responses"]
]
store.batch_put(items)  # 假设有批量 API

12.2 缓存策略

python
from functools import lru_cache
from datetime import datetime, timedelta

# 缓存记忆查询
_memory_cache = {}
_cache_ttl = timedelta(minutes=5)

def get_memories_cached(store, user_id):
    cache_key = f"memories_{user_id}"

    if cache_key in _memory_cache:
        cached_data, cached_time = _memory_cache[cache_key]
        if datetime.now() - cached_time < _cache_ttl:
            return cached_data

    # 缓存过期或不存在,重新查询
    namespace = ("memories", user_id)
    memories = store.search(namespace)

    _memory_cache[cache_key] = (memories, datetime.now())
    return memories

12.3 异步处理

python
import asyncio

async def write_memory_async(state, config, store):
    """异步保存记忆"""
    # ... 获取 user_id 和处理 ...

    # 异步调用 Trustcall
    result = await trustcall_extractor.ainvoke({
        "messages": updated_messages,
        "existing": existing_memories
    })

    # 异步保存
    tasks = [
        store.aput(namespace, key, value)
        for key, value in zip(keys, values)
    ]
    await asyncio.gather(*tasks)

十三、总结

13.1 核心要点

  1. Collection 是多个独立记忆项的集合

    • 每个记忆有唯一的 UUID 键
    • 可以无限扩展
    • 灵活且开放
  2. enable_inserts=True 是关键参数

    • 允许 Trustcall 插入新记忆
    • 同时支持更新现有记忆
    • 实现并行更新和插入
  3. UUID 管理记忆项

    • uuid.uuid4() 生成唯一标识
    • 支持独立的增删改查
    • 避免键冲突
  4. Trustcall 智能决策

    • 自动判断更新还是插入
    • json_doc_id 标识更新操作
    • 保持信息一致性
  5. Profile 和 Collection 互补

    • Profile:固定结构的核心信息
    • Collection:灵活扩展的补充信息
    • 组合使用效果最佳

13.2 学到的技能

Python 技能

  • UUID 的生成和使用
  • 生成器表达式和列表推导式
  • zip() 函数的并行迭代
  • .get() 方法的默认值处理
  • merge_message_runs() 的使用

LangChain/LangGraph 技能

  • Collection Schema 的定义
  • enable_inserts=True 参数
  • Store 的 search() vs get()
  • Trustcall 的高级用法
  • 记忆格式化和展示

系统设计

  • Profile vs Collection 的选择
  • 记忆管理策略
  • 性能优化技巧
  • 扩展性考虑

13.3 Profile vs Collection 决策树

需要保存信息

信息结构是否固定?

  Yes                    No
    ↓                     ↓
字段数量是否有限?    信息是否持续增长?
    ↓                     ↓
  Yes        No         Yes        No
    ↓         ↓           ↓          ↓
  Profile   考虑      Collection   考虑
            Profile+                 Profile
            Collection              (如果真的固定)

13.4 最佳实践总结

场景建议
纯结构化数据使用 Profile
纯开放式数据使用 Collection
混合场景Profile + Collection
大量记忆Collection + 筛选/摘要
性能关键缓存 + 批量操作
需要搜索Collection + 向量数据库

13.5 下一步

进阶主题

  1. 向量搜索:使用 Embedding 和向量数据库检索相关记忆
  2. 记忆图:建立记忆之间的关系网络
  3. 自动归档:智能管理记忆的生命周期
  4. 多模态记忆:不仅是文本,还包括图片、音频等

实际应用

  1. 个人 AI 助理:结合 Profile 和 Collection
  2. 客服机器人:记录用户历史和偏好
  3. 教育辅导:追踪学生的学习进度
  4. 内容推荐:基于记忆的个性化推荐

十四、附录

A.1 完整代码清单

python
# 导入依赖
from IPython.display import Image, display
import uuid
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.memory import InMemoryStore
from langchain_core.messages import merge_message_runs, HumanMessage, SystemMessage
from langchain_core.runnables.config import RunnableConfig
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore
from langchain_openai import ChatOpenAI
from trustcall import create_extractor
from pydantic import BaseModel, Field

# 初始化模型
model = ChatOpenAI(model="gpt-4o", temperature=0)

# Memory Schema
class Memory(BaseModel):
    content: str = Field(
        description="The main content of the memory. For example: User expressed interest in learning about French."
    )

# 创建 Trustcall 提取器
trustcall_extractor = create_extractor(
    model,
    tools=[Memory],
    tool_choice="Memory",
    enable_inserts=True,
)

# 系统提示词
MODEL_SYSTEM_MESSAGE = """You are a helpful chatbot. You are designed to be a companion to a user.

You have a long term memory which keeps track of information you learn about the user over time.

Current Memory (may include updated memories from this conversation):

{memory}"""

TRUSTCALL_INSTRUCTION = """Reflect on following interaction.

Use the provided tools to retain any necessary memories about the user.

Use parallel tool calling to handle updates and insertions simultaneously:"""

# 定义节点
def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):
    user_id = config["configurable"]["user_id"]
    namespace = ("memories", user_id)
    memories = store.search(namespace)

    info = "\n".join(f"- {mem.value['content']}" for mem in memories)
    system_msg = MODEL_SYSTEM_MESSAGE.format(memory=info)

    response = model.invoke([SystemMessage(content=system_msg)] + state["messages"])
    return {"messages": response}

def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):
    user_id = config["configurable"]["user_id"]
    namespace = ("memories", user_id)
    existing_items = store.search(namespace)

    tool_name = "Memory"
    existing_memories = (
        [(existing_item.key, tool_name, existing_item.value)
         for existing_item in existing_items]
        if existing_items
        else None
    )

    updated_messages = list(merge_message_runs(
        messages=[SystemMessage(content=TRUSTCALL_INSTRUCTION)] + state["messages"]
    ))

    result = trustcall_extractor.invoke({
        "messages": updated_messages,
        "existing": existing_memories
    })

    for r, rmeta in zip(result["responses"], result["response_metadata"]):
        store.put(
            namespace,
            rmeta.get("json_doc_id", str(uuid.uuid4())),
            r.model_dump(mode="json"),
        )

# 构建图
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)

# 编译图
across_thread_memory = InMemoryStore()
within_thread_memory = MemorySaver()
graph = builder.compile(
    checkpointer=within_thread_memory,
    store=across_thread_memory
)

# 使用示例
config = {"configurable": {"thread_id": "1", "user_id": "1"}}
input_messages = [HumanMessage(content="Hi, my name is Lance")]

for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

A.2 常用操作速查表

python
# 创建 Collection 提取器
extractor = create_extractor(
    model,
    tools=[Memory],
    tool_choice="Memory",
    enable_inserts=True  # 关键!
)

# 提取记忆(创建)
result = extractor.invoke({
    "messages": [...]
})

# 提取记忆(更新+插入)
result = extractor.invoke(
    {"messages": [...]},
    {"existing": [(key, "Memory", data), ...]}
)

# 保存记忆到 Store
for r, rmeta in zip(result["responses"], result["response_metadata"]):
    store.put(
        namespace,
        rmeta.get("json_doc_id", str(uuid.uuid4())),
        r.model_dump(mode="json")
    )

# 检索所有记忆
memories = store.search(namespace)

# 格式化记忆
info = "\n".join(f"- {m.value['content']}" for m in memories)

# 删除特定记忆
store.delete(namespace, memory_key)

A.3 Profile + Collection 模板

python
def call_model_hybrid(state, config, store):
    """同时使用 Profile 和 Collection"""
    user_id = config["configurable"]["user_id"]

    # 加载 Profile
    profile = store.get(("profile", user_id), "user_profile")
    if profile:
        profile_info = f"""User Profile:
- Name: {profile.value['user_name']}
- Location: {profile.value['user_location']}"""
    else:
        profile_info = "No profile information available."

    # 加载 Collection
    memories = store.search(("memories", user_id))
    if memories:
        memories_info = "Additional Memories:\n" + "\n".join(
            f"- {m.value['content']}" for m in memories
        )
    else:
        memories_info = "No additional memories available."

    # 组合信息
    system_msg = f"""You are a helpful assistant.

{profile_info}

{memories_info}

Use this information to personalize your responses."""

    response = model.invoke([SystemMessage(content=system_msg)] + state["messages"])
    return {"messages": response}

文档版本:1.0 最后更新:2024-11-05 作者:AI Assistant 基于:LangChain Academy Module-5 Lesson 5.4

恭喜你完成了 Module-5 的所有内容!希望这份详细解读能帮助你完全掌握 LangGraph 的 Collection Schema 和记忆管理系统!🎉

基于 MIT 许可证发布。内容版权归作者所有。