5.4 Memory Schema - Collection - 详细解读
一、概述
1.1 本节简介
本节是 LangChain Academy Module-5 的第四部分,也是最后一部分,主要内容是将记忆从单一 Profile(用户资料) 转变为 Collection(记忆集合),实现更灵活和可扩展的记忆管理系统。
1.2 Profile vs Collection:核心区别
这是理解本节的关键!让我们先明确这两种记忆模式的本质区别:
Profile(单一对象)
5.3 节的 Profile 模式:
# 单一的用户资料对象
{
"user_name": "Lance",
"user_location": "San Francisco",
"interests": ["biking", "bakeries"]
}
特点:
- ✅ 固定结构
- ✅ 字段明确
- ✅ 更新时修改特定字段
- ❌ 不适合开放式信息收集
- ❌ 难以添加新类型的信息
类比:就像一张固定格式的表单。
Collection(记忆集合)
5.4 节的 Collection 模式:
# 多个独立的记忆项
[
{"content": "User's name is Lance."},
{"content": "Lance likes to bike around San Francisco."},
{"content": "Lance enjoys going to bakeries."}
]
特点:
- ✅ 灵活结构
- ✅ 可以无限扩展
- ✅ 每个记忆独立存储
- ✅ 可以添加任何类型的新记忆
- ✅ 支持并行更新和插入
类比:就像一个笔记本,可以不断添加新笔记。
1.3 直观对比
Profile 模式(5.3) Collection 模式(5.4)
↓ ↓
┌────────────────┐ ┌────────────────┐
│ 用户资料 │ │ 记忆 1 │
│ ┌──────────┐ │ │ content: ... │
│ │ name │ │ └────────────────┘
│ │ location │ │ ┌────────────────┐
│ │ interests│ │ │ 记忆 2 │
│ └──────────┘ │ │ content: ... │
│ 单一对象 │ └────────────────┘
└────────────────┘ ┌────────────────┐
│ 记忆 3 │
更新 = 修改字段 │ content: ... │
└────────────────┘
多个独立对象
更新 = 修改现有项
插入 = 添加新项
1.4 学习目标
通过学习本节内容,你将掌握:
- Collection Schema 的定义和使用
enable_inserts=True
参数的作用- 如何使用 UUID 管理多个记忆项
- Trustcall 如何同时处理更新和插入
- 何时使用 Profile,何时使用 Collection
1.5 应用场景
Collection 适合的场景:
场景 | 为什么用 Collection |
---|---|
学习笔记 | 可以不断添加新知识点 |
对话记录 | 保存重要的对话片段 |
用户反馈 | 收集多条反馈意见 |
待办事项 | 添加和管理多个任务 |
开放式问答 | 记录用户的各种问题 |
Profile 适合的场景:
场景 | 为什么用 Profile |
---|---|
用户资料 | 固定字段(姓名、年龄等) |
系统配置 | 结构化的设置项 |
认证信息 | 明确的凭证字段 |
二、Collection Schema 基础
2.1 定义 Memory Schema
首先定义单个记忆的结构:
from pydantic import BaseModel, Field
class Memory(BaseModel):
content: str = Field(
description="The main content of the memory. For example: User expressed interest in learning about French."
)
设计特点:
极简设计:
- 只有一个字段:
content
- 没有预定义的类别或标签
- 最大化灵活性
- 只有一个字段:
描述性指引:
description
字段告诉 LLM 如何使用- 提供示例帮助理解
开放式内容:
- 可以存储任何类型的信息
- 没有结构限制
2.2 定义 MemoryCollection Schema
定义记忆集合(用于批量提取):
class MemoryCollection(BaseModel):
memories: list[Memory] = Field(
description="A list of memories about the user."
)
用途:
- 用于
with_structured_output()
一次性提取多条记忆 - 模型会返回一个包含多个 Memory 的列表
Python 知识点:
类型注解 list[Memory]
:
# Python 3.9+
memories: list[Memory]
# Python 3.8 及更早版本
from typing import List
memories: List[Memory]
2.3 为什么这样设计?
简单但强大的理由:
灵活性最大化:
python# 可以存储任何类型的信息 Memory(content="User's name is Lance.") Memory(content="Lance likes biking.") Memory(content="Lance visited Paris in 2023.") Memory(content="Lance prefers dark mode in apps.")
易于扩展:
python# 如果将来需要更多字段,可以扩展 class Memory(BaseModel): content: str category: Optional[str] = None # 可选的类别 importance: Optional[int] = None # 可选的重要性评分 timestamp: datetime = Field(default_factory=datetime.now)
与 Profile 互补:
python# Profile: 结构化的固定信息 profile = { "user_name": "Lance", "user_location": "San Francisco" } # Collection: 灵活的开放式信息 memories = [ {"content": "Lance likes biking."}, {"content": "Lance visited Tartine bakery."}, {"content": "Lance is planning a trip to Japan."} ]
三、使用 with_structured_output 提取 Collection
3.1 基础提取
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
# 初始化模型
model = ChatOpenAI(model="gpt-4o", temperature=0)
# 绑定 Schema 到模型
model_with_structure = model.with_structured_output(MemoryCollection)
# 调用模型提取记忆
memory_collection = model_with_structure.invoke([
HumanMessage("My name is Lance. I like to bike.")
])
# 查看结果
print(memory_collection.memories)
输出:
[
Memory(content="User's name is Lance."),
Memory(content='Lance likes to bike.')
]
分析:
- 模型自动将一句话拆分成两条独立的记忆
- 每条记忆都是一个
Memory
对象 - 符合
MemoryCollection
的结构
3.2 序列化为字典
# 单个记忆
memory = memory_collection.memories[0]
print(memory.model_dump())
# 输出:{'content': "User's name is Lance."}
# 所有记忆
for mem in memory_collection.memories:
print(mem.model_dump())
model_dump()
的作用:
- 将 Pydantic 模型转换为 Python 字典
- 便于存储到 Store 或数据库
- 可以序列化为 JSON
3.3 保存到 Store
Collection 的每个记忆项都独立保存,使用唯一的 UUID 作为键:
import uuid
from langgraph.store.memory import InMemoryStore
# 初始化 Store
in_memory_store = InMemoryStore()
# 定义命名空间
user_id = "1"
namespace_for_memory = (user_id, "memories")
# 保存第一条记忆
key = str(uuid.uuid4()) # 生成唯一 ID
value = memory_collection.memories[0].model_dump()
in_memory_store.put(namespace_for_memory, key, value)
# 保存第二条记忆
key = str(uuid.uuid4()) # 另一个唯一 ID
value = memory_collection.memories[1].model_dump()
in_memory_store.put(namespace_for_memory, key, value)
为什么使用 UUID?
使用固定键 | 使用 UUID |
---|---|
只能有一条记忆 | 可以有无限条记忆 |
新记忆会覆盖旧记忆 | 每条记忆独立存储 |
适合 Profile | 适合 Collection |
存储结构示意:
Store
└── ("1", "memories") ← namespace
├── "e1c4e5ab-..." → {"content": "User's name is Lance."}
└── "e132a1ea-..." → {"content": "Lance likes to bike."}
3.4 搜索记忆
# 搜索命名空间中的所有记忆
for m in in_memory_store.search(namespace_for_memory):
print(m.dict())
输出:
{
'value': {'content': "User's name is Lance."},
'key': 'e1c4e5ab-ab0f-4cbb-822d-f29240a983af',
'namespace': ['1', 'memories'],
'created_at': '2024-10-30T21:43:26.893775+00:00',
'updated_at': '2024-10-30T21:43:26.893779+00:00'
}
{
'value': {'content': 'Lance likes to bike.'},
'key': 'e132a1ea-6202-43ac-a9a6-3ecf2c1780a8',
'namespace': ['1', 'memories'],
'created_at': '2024-10-30T21:43:26.893833+00:00',
'updated_at': '2024-10-30T21:43:26.893834+00:00'
}
四、Trustcall 的 enable_inserts 参数
4.1 为什么需要 enable_inserts?
在 5.3 节中,Trustcall 默认只更新单一对象:
# 5.3 的用法(Profile)
extractor = create_extractor(
model,
tools=[UserProfile],
tool_choice="UserProfile"
# 没有 enable_inserts
)
对于 Collection,我们需要添加新项和更新现有项:
# 5.4 的用法(Collection)
extractor = create_extractor(
model,
tools=[Memory],
tool_choice="Memory",
enable_inserts=True # 关键!
)
4.2 enable_inserts 的作用
enable_inserts=True
告诉 Trustcall:
- ✅ 可以创建新的记忆项
- ✅ 可以更新现有的记忆项
- ✅ 可以同时进行创建和更新(并行操作)
没有 enable_inserts:
- 只能更新现有项
- 不能添加新项
- 适合 Profile 场景
有 enable_inserts:
- 可以添加新项
- 也可以更新现有项
- 适合 Collection 场景
4.3 创建 Trustcall Extractor
from trustcall import create_extractor
# 定义 Memory Schema
class Memory(BaseModel):
content: str = Field(
description="The main content of the memory. For example: User expressed interest in learning about French."
)
# 创建提取器
trustcall_extractor = create_extractor(
model,
tools=[Memory],
tool_choice="Memory",
enable_inserts=True, # 启用插入功能
)
五、Trustcall 处理 Collection
5.1 基础提取(创建新记忆)
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
# 提取指令
instruction = """Extract memories from the following conversation:"""
# 对话
conversation = [
HumanMessage(content="Hi, I'm Lance."),
AIMessage(content="Nice to meet you, Lance."),
HumanMessage(content="This morning I had a nice bike ride in San Francisco.")
]
# 调用提取器
result = trustcall_extractor.invoke({
"messages": [SystemMessage(content=instruction)] + conversation
})
查看消息(工具调用):
for m in result["messages"]:
m.pretty_print()
输出:
================================== Ai Message ==================================
Tool Calls:
Memory (call_Pj4kctFlpg9TgcMBfMH33N30)
Call ID: call_Pj4kctFlpg9TgcMBfMH33N30
Args:
content: Lance had a nice bike ride in San Francisco this morning.
查看响应(解析后的对象):
for m in result["responses"]:
print(m)
输出:
content='Lance had a nice bike ride in San Francisco this morning.'
查看元数据:
for m in result["response_metadata"]:
print(m)
输出:
{'id': 'call_Pj4kctFlpg9TgcMBfMH33N30'}
5.2 更新和插入(核心功能)
现在让我们看看 Trustcall 如何同时处理更新现有记忆和插入新记忆。
5.2.1 准备现有记忆
# 更新后的对话
updated_conversation = [
AIMessage(content="That's great, did you do after?"),
HumanMessage(content="I went to Tartine and ate a croissant."),
AIMessage(content="What else is on your mind?"),
HumanMessage(content="I was thinking about my Japan, and going back this winter!"),
]
# 更新指令
system_msg = """Update existing memories and create new ones based on the following conversation:"""
# 准备现有记忆
tool_name = "Memory"
existing_memories = [
(str(i), tool_name, memory.model_dump())
for i, memory in enumerate(result["responses"])
] if result["responses"] else None
print(existing_memories)
输出:
[
(
'0', # ID
'Memory', # 工具名
{'content': 'Lance had a nice bike ride in San Francisco this morning.'} # 数据
)
]
数据格式说明:
Trustcall 期望现有数据的格式:
[
(id, tool_name, data),
(id, tool_name, data),
...
]
- id:字符串,标识记忆项(用于更新)
- tool_name:工具名称("Memory")
- data:字典格式的数据
Python 知识点:
列表推导式的完整形式:
existing_memories = [
(str(i), tool_name, memory.model_dump())
for i, memory in enumerate(result["responses"])
] if result["responses"] else None
# 展开等价于:
if result["responses"]:
existing_memories = []
for i, memory in enumerate(result["responses"]):
existing_memories.append((str(i), tool_name, memory.model_dump()))
else:
existing_memories = None
5.2.2 调用 Trustcall 进行更新和插入
# 调用提取器
result = trustcall_extractor.invoke({
"messages": updated_conversation,
"existing": existing_memories
})
查看工具调用:
for m in result["messages"]:
m.pretty_print()
输出:
================================== Ai Message ==================================
Tool Calls:
Memory (call_vxks0YH1hwUxkghv4f5zdkTr)
Call ID: call_vxks0YH1hwUxkghv4f5zdkTr
Args:
content: Lance had a nice bike ride in San Francisco this morning. He went to Tartine and ate a croissant. He was thinking about his trip to Japan and going back this winter!
Memory (call_Y4S3poQgFmDfPy2ExPaMRk8g)
Call ID: call_Y4S3poQgFmDfPy2ExPaMRk8g
Args:
content: Lance went to Tartine and ate a croissant. He was thinking about his trip to Japan and going back this winter!
关键观察:
- 有两个 Memory 工具调用
- 第一个包含了所有信息(更新)
- 第二个只包含新信息(插入)
查看响应:
for m in result["responses"]:
print(m)
输出:
content='Lance had a nice bike ride in San Francisco this morning. He went to Tartine and ate a croissant. He was thinking about his trip to Japan and going back this winter!'
content='Lance went to Tartine and ate a croissant. He was thinking about his trip to Japan and going back this winter!'
查看元数据(关键):
for m in result["response_metadata"]:
print(m)
输出:
{'id': 'call_vxks0YH1hwUxkghv4f5zdkTr', 'json_doc_id': '0'}
{'id': 'call_Y4S3poQgFmDfPy2ExPaMRk8g'}
重点分析:
响应 | 元数据 | 含义 |
---|---|---|
第一个 | 'json_doc_id': '0' | 更新现有记忆(ID 为 '0') |
第二个 | 没有 json_doc_id | 插入新记忆 |
5.3 Trustcall 的智能决策
Trustcall 自动决定哪些记忆应该更新,哪些应该插入:
原始记忆:
- Lance had a nice bike ride in San Francisco this morning.
新对话:
- I went to Tartine and ate a croissant.
- I was thinking about Japan, and going back this winter!
Trustcall 的决策:
┌─────────────────────────────────────────────────┐
│ 更新记忆 0: │
│ "Lance had a nice bike ride... He went to │
│ Tartine... He was thinking about his trip..." │
│ (整合新旧信息) │
└─────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────┐
│ 插入新记忆: │
│ "Lance went to Tartine... He was thinking │
│ about his trip to Japan..." │
│ (独立的新记忆) │
└─────────────────────────────────────────────────┘
为什么这样设计?
- 信息整合:相关的信息更新到同一条记忆中
- 独立性:不相关或独立的事件创建新记忆
- 避免冗余:不会创建完全重复的记忆
- 保持灵活性:根据内容自动决策
六、构建带有 Collection Schema 的聊天机器人
6.1 完整的实现
from IPython.display import Image, display
import uuid
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.memory import InMemoryStore
from langchain_core.messages import merge_message_runs, HumanMessage, SystemMessage
from langchain_core.runnables.config import RunnableConfig
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore
from langchain_openai import ChatOpenAI
from trustcall import create_extractor
from pydantic import BaseModel, Field
# 初始化模型
model = ChatOpenAI(model="gpt-4o", temperature=0)
# Memory Schema
class Memory(BaseModel):
content: str = Field(
description="The main content of the memory. For example: User expressed interest in learning about French."
)
# 创建 Trustcall 提取器
trustcall_extractor = create_extractor(
model,
tools=[Memory],
tool_choice="Memory",
enable_inserts=True, # 允许插入新记忆
)
# 系统提示词
MODEL_SYSTEM_MESSAGE = """You are a helpful chatbot. You are designed to be a companion to a user.
You have a long term memory which keeps track of information you learn about the user over time.
Current Memory (may include updated memories from this conversation):
{memory}"""
TRUSTCALL_INSTRUCTION = """Reflect on following interaction.
Use the provided tools to retain any necessary memories about the user.
Use parallel tool calling to handle updates and insertions simultaneously:"""
6.2 call_model 节点
def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):
"""从 store 加载记忆并用它们来个性化聊天机器人的响应"""
# 获取用户 ID
user_id = config["configurable"]["user_id"]
# 从 store 检索记忆
namespace = ("memories", user_id)
memories = store.search(namespace)
# 格式化记忆为系统提示
info = "\n".join(f"- {mem.value['content']}" for mem in memories)
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=info)
# 使用记忆和聊天历史生成响应
response = model.invoke([
SystemMessage(content=system_msg)
] + state["messages"])
return {"messages": response}
代码分析:
第 7-8 行:检索所有记忆
namespace = ("memories", user_id)
memories = store.search(namespace)
- 使用
search()
而不是get()
- 因为 Collection 有多个记忆项
- 返回该命名空间下的所有记忆
第 10-11 行:格式化记忆
info = "\n".join(f"- {mem.value['content']}" for mem in memories)
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=info)
格式化示例:
# memories = [
# Item(value={'content': "User's name is Lance."}),
# Item(value={'content': 'Lance likes to bike around San Francisco.'})
# ]
info = """
- User's name is Lance.
- Lance likes to bike around San Francisco.
"""
生成器表达式详解:
"\n".join(f"- {mem.value['content']}" for mem in memories)
# 等价于:
result = []
for mem in memories:
result.append(f"- {mem.value['content']}")
info = "\n".join(result)
6.3 write_memory 节点
def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):
"""反思聊天历史并更新记忆集合"""
# 获取用户 ID
user_id = config["configurable"]["user_id"]
# 定义命名空间
namespace = ("memories", user_id)
# 检索最近的记忆作为上下文
existing_items = store.search(namespace)
# 为 Trustcall 提取器格式化现有记忆
tool_name = "Memory"
existing_memories = (
[(existing_item.key, tool_name, existing_item.value)
for existing_item in existing_items]
if existing_items
else None
)
# 合并聊天历史和指令
updated_messages = list(merge_message_runs(
messages=[SystemMessage(content=TRUSTCALL_INSTRUCTION)] + state["messages"]
))
# 调用提取器
result = trustcall_extractor.invoke({
"messages": updated_messages,
"existing": existing_memories
})
# 保存 Trustcall 的记忆到 store
for r, rmeta in zip(result["responses"], result["response_metadata"]):
store.put(
namespace,
rmeta.get("json_doc_id", str(uuid.uuid4())), # 关键!
r.model_dump(mode="json"),
)
代码逐行解析:
第 11 行:检索现有记忆
existing_items = store.search(namespace)
- 获取所有现有记忆项
- 用于 Trustcall 的上下文
第 14-20 行:格式化为 Trustcall 格式
tool_name = "Memory"
existing_memories = (
[(existing_item.key, tool_name, existing_item.value)
for existing_item in existing_items]
if existing_items
else None
)
格式化结果示例:
[
('dee65880-...', 'Memory', {'content': "User's name is Lance."}),
('662195fc-...', 'Memory', {'content': 'Lance likes to bike...'})
]
第 22-25 行:合并消息
updated_messages = list(merge_message_runs(
messages=[SystemMessage(content=TRUSTCALL_INSTRUCTION)] + state["messages"]
))
merge_message_runs()
的作用:
# 合并前:
[
HumanMessage("Hi"),
HumanMessage("How are you?"),
AIMessage("I'm good"),
AIMessage("Thanks for asking")
]
# 合并后:
[
HumanMessage("Hi\nHow are you?"),
AIMessage("I'm good\nThanks for asking")
]
- 合并连续的相同角色消息
- 减少消息数量
- 提高效率
第 28-32 行:调用 Trustcall
result = trustcall_extractor.invoke({
"messages": updated_messages,
"existing": existing_memories
})
第 34-39 行:保存结果(关键逻辑)
for r, rmeta in zip(result["responses"], result["response_metadata"]):
store.put(
namespace,
rmeta.get("json_doc_id", str(uuid.uuid4())),
r.model_dump(mode="json"),
)
核心逻辑解析:
rmeta.get("json_doc_id", str(uuid.uuid4()))
这行代码决定了是更新还是插入:
情况 | rmeta 内容 | 使用的键 | 操作 |
---|---|---|---|
更新现有记忆 | {'id': '...', 'json_doc_id': '0'} | 现有的键(从 existing_memories ) | 更新 |
插入新记忆 | {'id': '...'} | 新的 UUID | 插入 |
示例:
# 情况 1:更新
rmeta = {'id': 'call_xxx', 'json_doc_id': 'dee65880-...'}
key = rmeta.get("json_doc_id", str(uuid.uuid4()))
# key = 'dee65880-...'(现有键)
# → 更新现有记忆
# 情况 2:插入
rmeta = {'id': 'call_yyy'}
key = rmeta.get("json_doc_id", str(uuid.uuid4()))
# key = '新的UUID'(例如 'f1234567-...')
# → 插入新记忆
Python 知识点:
zip()
函数:
responses = [mem1, mem2]
metadata = [meta1, meta2]
for r, rmeta in zip(responses, metadata):
# 第一次迭代:r=mem1, rmeta=meta1
# 第二次迭代:r=mem2, rmeta=meta2
process(r, rmeta)
.get()
方法的默认值:
d = {'a': 1}
d.get('a', 0) # 返回 1(键存在)
d.get('b', 0) # 返回 0(键不存在,使用默认值)
6.4 构建图
# 定义图
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)
# 长期记忆(跨线程)
across_thread_memory = InMemoryStore()
# 短期记忆(线程内)
within_thread_memory = MemorySaver()
# 编译图
graph = builder.compile(
checkpointer=within_thread_memory,
store=across_thread_memory
)
图结构:
START → call_model → write_memory → END
七、实战演示
7.1 第一次交互:创建第一条记忆
# 配置
config = {
"configurable": {
"thread_id": "1",
"user_id": "1"
}
}
# 用户输入
input_messages = [HumanMessage(content="Hi, my name is Lance")]
# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
输出:
================================ Human Message =================================
Hi, my name is Lance
================================== Ai Message ==================================
Hi Lance! It's great to meet you. How can I assist you today?
7.2 第二次交互:添加新记忆
# 用户输入
input_messages = [HumanMessage(content="I like to bike around San Francisco")]
# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
输出:
================================ Human Message =================================
I like to bike around San Francisco
================================== Ai Message ==================================
That sounds like a lot of fun! San Francisco has some beautiful routes for biking.
Do you have a favorite trail or area you like to explore?
7.3 查看记忆集合
# 定义命名空间
user_id = "1"
namespace = ("memories", user_id)
# 搜索所有记忆
memories = across_thread_memory.search(namespace)
# 打印每条记忆
for m in memories:
print(m.dict())
输出:
{
'value': {'content': "User's name is Lance."},
'key': 'dee65880-dd7d-4184-8ca1-1f7400f7596b',
'namespace': ['memories', '1'],
'created_at': '2024-10-30T22:18:52.413283+00:00',
'updated_at': '2024-10-30T22:18:52.413284+00:00'
}
{
'value': {'content': 'User likes to bike around San Francisco.'},
'key': '662195fc-8ea4-4f64-a6b6-6b86d9cb85c0',
'namespace': ['memories', '1'],
'created_at': '2024-10-30T22:18:56.597813+00:00',
'updated_at': '2024-10-30T22:18:56.597814+00:00'
}
分析:
- 有两条独立的记忆
- 每条记忆有唯一的 UUID 键
- 分别记录了名字和爱好
7.4 继续添加记忆
# 用户输入
input_messages = [HumanMessage(content="I also enjoy going to bakeries")]
# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
输出:
================================ Human Message =================================
I also enjoy going to bakeries
================================== Ai Message ==================================
Biking and bakeries make a great combination! Do you have a favorite bakery in
San Francisco, or are you on the hunt for new ones to try?
现在应该有三条记忆:
- User's name is Lance.
- Lance likes to bike around San Francisco.
- Lance enjoys going to bakeries.
7.5 跨会话测试
# 新的线程,相同的用户
config = {
"configurable": {
"thread_id": "2", # 新会话
"user_id": "1" # 相同用户
}
}
# 用户输入
input_messages = [HumanMessage(content="What bakeries do you recommend for me?")]
# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
输出:
================================ Human Message =================================
What bakeries do you recommend for me?
================================== Ai Message ==================================
Since you enjoy biking around San Francisco, you might like to check out some of
these bakeries that are both delicious and located in areas that are great for a
bike ride:
1. **Tartine Bakery** - Located in the Mission District, it's famous for its
bread and pastries. The area is vibrant and perfect for a leisurely ride.
2. **Arsicault Bakery** - Known for its incredible croissants, it's in the
Richmond District, which offers a nice ride through Golden Gate Park.
3. **B. Patisserie** - Situated in Lower Pacific Heights, this bakery is renowned
for its kouign-amann and other French pastries. The neighborhood is charming
and bike-friendly.
4. **Mr. Holmes Bakehouse** - Famous for its cruffins, it's located in the
Tenderloin, which is a bit more urban but still accessible by bike.
5. **Noe Valley Bakery** - A cozy spot in Noe Valley, perfect for a stop after
exploring the hilly streets of the area.
Do any of these sound like a good fit for your next biking adventure?
Collection 的威力:
- ✅ AI 记得所有记忆(名字、骑车、面包店)
- ✅ 结合多条记忆生成个性化建议
- ✅ "Since you enjoy biking"(来自记忆 2)
- ✅ 推荐适合骑车去的面包店(结合记忆 2 和 3)
八、Profile vs Collection 深入对比
8.1 数据结构对比
Profile(5.3):
# 单一对象
{
"user_name": "Lance",
"user_location": "San Francisco",
"interests": ["biking", "bakeries"]
}
# 存储方式
namespace = ("memory", user_id)
key = "user_memory" # 固定键
store.put(namespace, key, profile_data)
Collection(5.4):
# 多个独立对象
[
{"content": "User's name is Lance."},
{"content": "Lance likes biking."},
{"content": "Lance enjoys bakeries."}
]
# 存储方式
namespace = ("memories", user_id)
for memory in memories:
key = str(uuid.uuid4()) # 每个记忆有唯一键
store.put(namespace, key, memory)
8.2 更新操作对比
Profile 更新:
# 整个对象被替换或字段被更新
old_profile = {
"user_name": "Lance",
"interests": ["biking"]
}
new_profile = {
"user_name": "Lance",
"interests": ["biking", "bakeries"] # 字段更新
}
# 覆盖整个 Profile
store.put(namespace, "user_memory", new_profile)
Collection 更新:
# 可以更新特定记忆,也可以添加新记忆
# 更新记忆 1
store.put(namespace, "existing-uuid-1", {
"content": "Updated content"
})
# 添加新记忆
store.put(namespace, str(uuid.uuid4()), {
"content": "New memory content"
})
# 现有记忆保持不变
8.3 检索方式对比
Profile 检索:
# 使用 get() 获取单一对象
profile = store.get(namespace, "user_memory")
if profile:
user_name = profile.value.get("user_name")
interests = profile.value.get("interests")
Collection 检索:
# 使用 search() 获取所有记忆
memories = store.search(namespace)
# 遍历所有记忆
for memory in memories:
content = memory.value['content']
print(content)
# 或者格式化
info = "\n".join(f"- {mem.value['content']}" for mem in memories)
8.4 Trustcall 参数对比
参数 | Profile (5.3) | Collection (5.4) |
---|---|---|
tools | [UserProfile] | [Memory] |
tool_choice | "UserProfile" | "Memory" |
enable_inserts | ❌ 不需要 | ✅ 必须(True ) |
existing 格式 | {"UserProfile": data} | [(id, "Memory", data), ...] |
8.5 使用场景对比
场景 | 推荐方案 | 原因 |
---|---|---|
用户资料 | Profile | 固定字段,结构化 |
待办事项 | Collection | 多个独立任务 |
系统设置 | Profile | 明确的配置项 |
学习笔记 | Collection | 开放式内容 |
对话记录 | Collection | 持续增长 |
认证信息 | Profile | 固定凭证 |
8.6 性能和效率对比
方面 | Profile | Collection |
---|---|---|
存储效率 | ✅ 单一对象 | ⚠️ 多个对象 |
更新效率 | ⚠️ 可能更新整个对象 | ✅ 只更新变化的项 |
查询效率 | ✅ 直接 get | ⚠️ 需要 search 遍历 |
扩展性 | ❌ 受字段限制 | ✅ 无限扩展 |
Token 使用 | ⚠️ 每次传递全部 | ✅ 可以选择性传递 |
九、高级主题
9.1 选择性记忆加载
当记忆集合很大时,不需要每次都加载所有记忆:
def call_model_with_filtering(state, config, store):
user_id = config["configurable"]["user_id"]
namespace = ("memories", user_id)
# 获取所有记忆
all_memories = store.search(namespace)
# 选择最近的 N 条记忆
recent_memories = sorted(
all_memories,
key=lambda m: m.updated_at,
reverse=True
)[:10] # 只使用最近 10 条
# 或者根据相关性筛选
# relevant_memories = filter_by_relevance(
# memories=all_memories,
# current_query=state["messages"][-1].content
# )
# 格式化记忆
info = "\n".join(f"- {mem.value['content']}" for mem in recent_memories)
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=info)
response = model.invoke([SystemMessage(content=system_msg)] + state["messages"])
return {"messages": response}
优化策略:
- 时间排序:使用最新的记忆
- 相关性筛选:只使用与当前对话相关的记忆
- 重要性评分:为记忆添加重要性字段
- 分类过滤:根据记忆类别筛选
9.2 记忆的类别化
扩展 Memory Schema 以支持分类:
from typing import Optional
class Memory(BaseModel):
content: str = Field(description="记忆的主要内容")
category: Optional[str] = Field(
default=None,
description="记忆类别:personal, preference, event, skill 等"
)
importance: Optional[int] = Field(
default=5,
ge=1,
le=10,
description="重要性评分(1-10)"
)
使用示例:
memories = [
Memory(content="User's name is Lance", category="personal", importance=10),
Memory(content="Lance likes biking", category="preference", importance=8),
Memory(content="Lance visited Tartine", category="event", importance=5)
]
筛选特定类别:
def get_memories_by_category(store, user_id, category):
namespace = ("memories", user_id)
all_memories = store.search(namespace)
return [
mem for mem in all_memories
if mem.value.get('category') == category
]
# 只获取偏好信息
preferences = get_memories_by_category(store, "1", "preference")
9.3 记忆的合并和去重
防止记忆过度冗余:
def deduplicate_memories(store, user_id):
"""去除重复或相似的记忆"""
namespace = ("memories", user_id)
memories = store.search(namespace)
# 使用 LLM 识别重复
# (简化示例,实际应该使用更复杂的逻辑)
unique_memories = {}
for mem in memories:
content = mem.value['content'].lower()
# 简单的相似度检查
is_duplicate = False
for existing_content in unique_memories.keys():
if content in existing_content or existing_content in content:
is_duplicate = True
break
if not is_duplicate:
unique_memories[content] = mem
return list(unique_memories.values())
9.4 记忆的自动摘要
当记忆过多时,自动生成摘要:
def summarize_memories(model, memories):
"""将多条记忆压缩为摘要"""
content_list = [mem.value['content'] for mem in memories]
all_content = "\n".join(f"- {c}" for c in content_list)
prompt = f"""Please create a concise summary of the following memories:
{all_content}
Summary:"""
summary = model.invoke([HumanMessage(content=prompt)])
return summary.content
十、实践建议
10.1 何时使用 Collection
✅ 适合 Collection 的场景:
信息类型不固定
python# 用户可能分享任何类型的信息 memories = [ "Lance likes coffee", "Lance visited Paris in 2020", "Lance is learning French", "Lance prefers morning meetings" ]
需要持续增长
python# 随着时间推移不断添加新记忆 day_1: ["Lance likes biking"] day_2: ["Lance likes biking", "Lance tried Tartine"] day_3: ["Lance likes biking", "Lance tried Tartine", "Lance planning Japan trip"]
需要独立管理每个项目
python# 每条记忆可以独立更新或删除 store.delete(namespace, memory_id_1) # 删除特定记忆 store.put(namespace, memory_id_2, updated_content) # 更新特定记忆
类似日志或事件流
python# 记录用户的活动流 "User logged in" "User completed tutorial" "User made first purchase"
10.2 Profile 和 Collection 的结合使用
最佳实践:同时使用两者!
# Profile:结构化的基本信息
profile_namespace = ("profile", user_id)
profile = {
"user_name": "Lance",
"user_location": "San Francisco",
"created_at": "2024-01-01"
}
store.put(profile_namespace, "user_profile", profile)
# Collection:灵活的补充信息
collection_namespace = ("memories", user_id)
memories = [
{"content": "Lance likes biking"},
{"content": "Lance visited Tartine bakery"},
{"content": "Lance is planning a Japan trip"}
]
for memory in memories:
store.put(collection_namespace, str(uuid.uuid4()), memory)
在聊天机器人中使用:
def call_model(state, config, store):
user_id = config["configurable"]["user_id"]
# 加载 Profile
profile = store.get(("profile", user_id), "user_profile")
profile_info = f"Name: {profile.value['user_name']}, Location: {profile.value['user_location']}"
# 加载 Collection
memories = store.search(("memories", user_id))
memories_info = "\n".join(f"- {m.value['content']}" for m in memories)
# 组合信息
system_msg = f"""You are a helpful assistant.
User Profile:
{profile_info}
Additional Memories:
{memories_info}"""
response = model.invoke([SystemMessage(content=system_msg)] + state["messages"])
return {"messages": response}
10.3 记忆管理最佳实践
1. 定期清理:
def cleanup_old_memories(store, user_id, days=90):
"""删除超过 N 天的旧记忆"""
from datetime import datetime, timedelta
namespace = ("memories", user_id)
memories = store.search(namespace)
cutoff_date = datetime.now() - timedelta(days=days)
for mem in memories:
if datetime.fromisoformat(mem.created_at) < cutoff_date:
store.delete(namespace, mem.key)
2. 限制记忆数量:
MAX_MEMORIES = 100
def add_memory_with_limit(store, user_id, new_memory):
namespace = ("memories", user_id)
memories = store.search(namespace)
if len(memories) >= MAX_MEMORIES:
# 删除最旧的记忆
oldest = min(memories, key=lambda m: m.created_at)
store.delete(namespace, oldest.key)
# 添加新记忆
store.put(namespace, str(uuid.uuid4()), new_memory)
3. 记忆质量控制:
def is_valuable_memory(content: str) -> bool:
"""判断记忆是否有价值"""
# 过滤太短的记忆
if len(content) < 10:
return False
# 过滤无意义的记忆
useless_phrases = ["hello", "hi", "ok", "thanks", "bye"]
if content.lower() in useless_phrases:
return False
return True
# 在 write_memory 中使用
for r in result["responses"]:
if is_valuable_memory(r.content):
store.put(namespace, str(uuid.uuid4()), r.model_dump())
十一、常见问题和解决方案
11.1 记忆过多导致上下文过长
问题:
# 有 1000 条记忆
memories = store.search(namespace)
# len(memories) = 1000
# 全部传给 LLM 会超出上下文限制
解决方案:
方案 1:限制数量
# 只使用最近的 N 条
recent_memories = sorted(memories, key=lambda m: m.updated_at, reverse=True)[:20]
方案 2:语义搜索
# 使用向量数据库查找相关记忆
from langchain.vectorstores import Chroma
# 创建向量存储
vectorstore = Chroma.from_texts(
[m.value['content'] for m in memories],
embeddings
)
# 查询相关记忆
current_query = state["messages"][-1].content
relevant_memories = vectorstore.similarity_search(current_query, k=10)
方案 3:分层摘要
# 对旧记忆进行摘要
old_memories_summary = summarize_memories(model, old_memories)
recent_memories_full = [m.value['content'] for m in recent_memories]
info = f"""Summary of older memories:
{old_memories_summary}
Recent memories:
{'\n'.join(recent_memories_full)}"""
11.2 重复记忆
问题:
# 用户多次提到同一件事
memories = [
"Lance likes biking",
"Lance enjoys biking",
"Lance loves to bike"
]
解决方案:
在 Trustcall 指令中明确要求:
TRUSTCALL_INSTRUCTION = """Reflect on the following interaction.
Create or update memories, avoiding duplicates.
If a memory is similar to an existing one, UPDATE the existing memory instead of creating a new one.
Use parallel tool calling to handle updates and insertions simultaneously."""
或者定期去重:
def merge_similar_memories(model, store, user_id):
namespace = ("memories", user_id)
memories = store.search(namespace)
# 使用 LLM 识别相似记忆
content_list = [m.value['content'] for m in memories]
prompt = f"""Identify groups of similar memories that should be merged:
{'\n'.join(f"{i}. {c}" for i, c in enumerate(content_list))}
Output groups of similar memory indices."""
result = model.invoke([HumanMessage(content=prompt)])
# 处理结果并合并...
11.3 记忆更新失败
问题:
# Trustcall 没有正确识别应该更新的记忆
# 导致创建了新记忆而不是更新现有记忆
解决方案:
确保 existing_memories 格式正确:
# 正确格式
existing_memories = [
(mem.key, "Memory", mem.value) # 使用 mem.key,不是 str(i)
for mem in store.search(namespace)
]
# 错误格式(可能导致更新失败)
existing_memories = [
(str(i), "Memory", mem.value) # 使用索引而不是实际的 key
for i, mem in enumerate(...)
]
提供更明确的更新指令:
TRUSTCALL_INSTRUCTION = """Review the existing memories and the new conversation.
For each piece of information:
1. If it's related to an existing memory, UPDATE that memory
2. If it's completely new information, CREATE a new memory
Prefer updating existing memories over creating new ones when possible."""
十二、性能优化
12.1 批量操作
问题:逐个保存记忆效率低
优化:
# 不好的做法
for memory in memories:
store.put(namespace, str(uuid.uuid4()), memory)
# 更好的做法(如果 Store 支持)
# 使用批量 API
items = [
(namespace, str(uuid.uuid4()), mem.model_dump())
for mem in result["responses"]
]
store.batch_put(items) # 假设有批量 API
12.2 缓存策略
from functools import lru_cache
from datetime import datetime, timedelta
# 缓存记忆查询
_memory_cache = {}
_cache_ttl = timedelta(minutes=5)
def get_memories_cached(store, user_id):
cache_key = f"memories_{user_id}"
if cache_key in _memory_cache:
cached_data, cached_time = _memory_cache[cache_key]
if datetime.now() - cached_time < _cache_ttl:
return cached_data
# 缓存过期或不存在,重新查询
namespace = ("memories", user_id)
memories = store.search(namespace)
_memory_cache[cache_key] = (memories, datetime.now())
return memories
12.3 异步处理
import asyncio
async def write_memory_async(state, config, store):
"""异步保存记忆"""
# ... 获取 user_id 和处理 ...
# 异步调用 Trustcall
result = await trustcall_extractor.ainvoke({
"messages": updated_messages,
"existing": existing_memories
})
# 异步保存
tasks = [
store.aput(namespace, key, value)
for key, value in zip(keys, values)
]
await asyncio.gather(*tasks)
十三、总结
13.1 核心要点
Collection 是多个独立记忆项的集合
- 每个记忆有唯一的 UUID 键
- 可以无限扩展
- 灵活且开放
enable_inserts=True
是关键参数- 允许 Trustcall 插入新记忆
- 同时支持更新现有记忆
- 实现并行更新和插入
UUID 管理记忆项
uuid.uuid4()
生成唯一标识- 支持独立的增删改查
- 避免键冲突
Trustcall 智能决策
- 自动判断更新还是插入
json_doc_id
标识更新操作- 保持信息一致性
Profile 和 Collection 互补
- Profile:固定结构的核心信息
- Collection:灵活扩展的补充信息
- 组合使用效果最佳
13.2 学到的技能
Python 技能:
- UUID 的生成和使用
- 生成器表达式和列表推导式
zip()
函数的并行迭代.get()
方法的默认值处理merge_message_runs()
的使用
LangChain/LangGraph 技能:
- Collection Schema 的定义
enable_inserts=True
参数- Store 的
search()
vsget()
- Trustcall 的高级用法
- 记忆格式化和展示
系统设计:
- Profile vs Collection 的选择
- 记忆管理策略
- 性能优化技巧
- 扩展性考虑
13.3 Profile vs Collection 决策树
需要保存信息
↓
信息结构是否固定?
↓
Yes No
↓ ↓
字段数量是否有限? 信息是否持续增长?
↓ ↓
Yes No Yes No
↓ ↓ ↓ ↓
Profile 考虑 Collection 考虑
Profile+ Profile
Collection (如果真的固定)
13.4 最佳实践总结
场景 | 建议 |
---|---|
纯结构化数据 | 使用 Profile |
纯开放式数据 | 使用 Collection |
混合场景 | Profile + Collection |
大量记忆 | Collection + 筛选/摘要 |
性能关键 | 缓存 + 批量操作 |
需要搜索 | Collection + 向量数据库 |
13.5 下一步
进阶主题:
- 向量搜索:使用 Embedding 和向量数据库检索相关记忆
- 记忆图:建立记忆之间的关系网络
- 自动归档:智能管理记忆的生命周期
- 多模态记忆:不仅是文本,还包括图片、音频等
实际应用:
- 个人 AI 助理:结合 Profile 和 Collection
- 客服机器人:记录用户历史和偏好
- 教育辅导:追踪学生的学习进度
- 内容推荐:基于记忆的个性化推荐
十四、附录
A.1 完整代码清单
# 导入依赖
from IPython.display import Image, display
import uuid
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.memory import InMemoryStore
from langchain_core.messages import merge_message_runs, HumanMessage, SystemMessage
from langchain_core.runnables.config import RunnableConfig
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore
from langchain_openai import ChatOpenAI
from trustcall import create_extractor
from pydantic import BaseModel, Field
# 初始化模型
model = ChatOpenAI(model="gpt-4o", temperature=0)
# Memory Schema
class Memory(BaseModel):
content: str = Field(
description="The main content of the memory. For example: User expressed interest in learning about French."
)
# 创建 Trustcall 提取器
trustcall_extractor = create_extractor(
model,
tools=[Memory],
tool_choice="Memory",
enable_inserts=True,
)
# 系统提示词
MODEL_SYSTEM_MESSAGE = """You are a helpful chatbot. You are designed to be a companion to a user.
You have a long term memory which keeps track of information you learn about the user over time.
Current Memory (may include updated memories from this conversation):
{memory}"""
TRUSTCALL_INSTRUCTION = """Reflect on following interaction.
Use the provided tools to retain any necessary memories about the user.
Use parallel tool calling to handle updates and insertions simultaneously:"""
# 定义节点
def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):
user_id = config["configurable"]["user_id"]
namespace = ("memories", user_id)
memories = store.search(namespace)
info = "\n".join(f"- {mem.value['content']}" for mem in memories)
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=info)
response = model.invoke([SystemMessage(content=system_msg)] + state["messages"])
return {"messages": response}
def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):
user_id = config["configurable"]["user_id"]
namespace = ("memories", user_id)
existing_items = store.search(namespace)
tool_name = "Memory"
existing_memories = (
[(existing_item.key, tool_name, existing_item.value)
for existing_item in existing_items]
if existing_items
else None
)
updated_messages = list(merge_message_runs(
messages=[SystemMessage(content=TRUSTCALL_INSTRUCTION)] + state["messages"]
))
result = trustcall_extractor.invoke({
"messages": updated_messages,
"existing": existing_memories
})
for r, rmeta in zip(result["responses"], result["response_metadata"]):
store.put(
namespace,
rmeta.get("json_doc_id", str(uuid.uuid4())),
r.model_dump(mode="json"),
)
# 构建图
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)
# 编译图
across_thread_memory = InMemoryStore()
within_thread_memory = MemorySaver()
graph = builder.compile(
checkpointer=within_thread_memory,
store=across_thread_memory
)
# 使用示例
config = {"configurable": {"thread_id": "1", "user_id": "1"}}
input_messages = [HumanMessage(content="Hi, my name is Lance")]
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
A.2 常用操作速查表
# 创建 Collection 提取器
extractor = create_extractor(
model,
tools=[Memory],
tool_choice="Memory",
enable_inserts=True # 关键!
)
# 提取记忆(创建)
result = extractor.invoke({
"messages": [...]
})
# 提取记忆(更新+插入)
result = extractor.invoke(
{"messages": [...]},
{"existing": [(key, "Memory", data), ...]}
)
# 保存记忆到 Store
for r, rmeta in zip(result["responses"], result["response_metadata"]):
store.put(
namespace,
rmeta.get("json_doc_id", str(uuid.uuid4())),
r.model_dump(mode="json")
)
# 检索所有记忆
memories = store.search(namespace)
# 格式化记忆
info = "\n".join(f"- {m.value['content']}" for m in memories)
# 删除特定记忆
store.delete(namespace, memory_key)
A.3 Profile + Collection 模板
def call_model_hybrid(state, config, store):
"""同时使用 Profile 和 Collection"""
user_id = config["configurable"]["user_id"]
# 加载 Profile
profile = store.get(("profile", user_id), "user_profile")
if profile:
profile_info = f"""User Profile:
- Name: {profile.value['user_name']}
- Location: {profile.value['user_location']}"""
else:
profile_info = "No profile information available."
# 加载 Collection
memories = store.search(("memories", user_id))
if memories:
memories_info = "Additional Memories:\n" + "\n".join(
f"- {m.value['content']}" for m in memories
)
else:
memories_info = "No additional memories available."
# 组合信息
system_msg = f"""You are a helpful assistant.
{profile_info}
{memories_info}
Use this information to personalize your responses."""
response = model.invoke([SystemMessage(content=system_msg)] + state["messages"])
return {"messages": response}
文档版本:1.0 最后更新:2024-11-05 作者:AI Assistant 基于:LangChain Academy Module-5 Lesson 5.4
恭喜你完成了 Module-5 的所有内容!希望这份详细解读能帮助你完全掌握 LangGraph 的 Collection Schema 和记忆管理系统!🎉