LangGraph 消息过滤与裁剪详细解读
📚 概述
本文档详细解读 LangGraph 中的 消息过滤(Message Filtering) 和 消息裁剪(Message Trimming) 技术。这两种技术是构建长期记忆聊天机器人的核心技能,帮助我们高效管理对话历史,避免 token 浪费和延迟问题。
在实际应用中,聊天机器人的对话可能会持续很长时间,如果不加控制地将所有历史消息传递给 LLM,会导致:
- Token 成本飙升:每次调用都传递完整历史
- 响应延迟增加:LLM 需要处理大量上下文
- 上下文窗口溢出:超过模型的最大 token 限制
本教程将展示三种解决方案,帮助你优雅地管理对话状态。
🎯 核心概念
消息管理的挑战
在长期运行的对话中,我们面临的核心问题是:
用户: Hi
Bot: Hi! How can I help?
用户: I'm researching whales
Bot: Great! Tell me more.
用户: What are other ocean mammals?
Bot: [这里需要完整上下文吗?]
用户: Tell me about dolphins
Bot: [还需要第一轮对话吗?]
... (100 轮对话后)
用户: What's the weather?
Bot: [需要前面 99 轮对话吗?❌]
2
3
4
5
6
7
8
9
10
11
三种解决方案对比
方案 | 原理 | 修改状态 | 适用场景 |
---|---|---|---|
RemoveMessage | 从状态中删除旧消息 | ✅ 是 | 永久删除不需要的历史 |
消息过滤 | 传递消息子集给 LLM | ❌ 否 | 保留完整历史,但限制 LLM 可见范围 |
消息裁剪 | 基于 token 数量智能截断 | ❌ 否 | 精确控制 token 使用 |
🎭 实战案例:海洋哺乳动物研究助手
我们将构建一个聊天机器人,演示三种消息管理技术的应用。
场景设定
用户正在研究海洋哺乳动物,进行了多轮对话:
- 打招呼
- 讨论鲸鱼
- 询问其他海洋哺乳动物
- 深入了解独角鲸
- 询问虎鲸栖息地
我们需要在保持上下文连贯性的同时,控制 token 使用。
🔧 方案一:使用 RemoveMessage 删除消息
核心思路
使用 RemoveMessage
和 add_messages
reducer,从状态中永久删除旧消息。
代码实现
1. 基础准备
from pprint import pprint
from langchain_core.messages import AIMessage, HumanMessage
from langchain_openai import ChatOpenAI
# 初始化 LLM
llm = ChatOpenAI(model="gpt-4o")
# 创建对话历史
messages = [AIMessage("So you said you were researching ocean mammals?", name="Bot")]
messages.append(HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance"))
# 查看消息
for m in messages:
m.pretty_print()
2
3
4
5
6
7
8
9
10
11
12
13
14
Python 知识点:消息对象
LangChain 使用专门的消息类来区分对话角色:
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
# HumanMessage - 用户输入
user_msg = HumanMessage(content="Hello!", name="Alice")
# AIMessage - AI 回复
ai_msg = AIMessage(content="Hi there!", name="Bot")
# SystemMessage - 系统指令
sys_msg = SystemMessage(content="You are a helpful assistant")
2
3
4
5
6
7
8
9
10
重要属性:
content
:消息内容name
:发送者名称(可选)id
:唯一标识符(用于 RemoveMessage)
2. 定义过滤节点
from langchain_core.messages import RemoveMessage
from langgraph.graph import MessagesState
def filter_messages(state: MessagesState):
"""删除除最后 2 条消息外的所有消息"""
# 获取要删除的消息(前 N-2 条)
delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
return {"messages": delete_messages}
2
3
4
5
6
7
8
LangGraph 知识点:RemoveMessage
RemoveMessage
是一个特殊的消息类型,告诉 add_messages
reducer 删除指定消息:
RemoveMessage(id="message_id_123")
# ^^^^^^^^^^^^^^^^^^^
# 要删除的消息的 ID
2
3
工作原理:
# 初始状态
state["messages"] = [msg1, msg2, msg3, msg4]
# filter_messages 返回
{"messages": [RemoveMessage(id=msg1.id), RemoveMessage(id=msg2.id)]}
# add_messages reducer 处理后
state["messages"] = [msg3, msg4] # msg1 和 msg2 被删除
2
3
4
5
6
7
8
关键: add_messages
reducer 会识别 RemoveMessage
,并从状态中删除对应的消息。
3. 定义聊天节点
def chat_model_node(state: MessagesState):
"""调用 LLM 生成回复"""
return {"messages": [llm.invoke(state["messages"])]}
2
3
注意: 返回 {"messages": [ai_message]}
而不是 {"messages": ai_message}
,因为 add_messages
reducer 期望列表。
4. 构建图
from langgraph.graph import StateGraph, START, END
# 创建图
builder = StateGraph(MessagesState)
# 添加节点
builder.add_node("filter", filter_messages)
builder.add_node("chat_model", chat_model_node)
# 添加边
builder.add_edge(START, "filter")
builder.add_edge("filter", "chat_model")
builder.add_edge("chat_model", END)
# 编译
graph = builder.compile()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
执行流程:
START → filter(删除旧消息)→ chat_model(LLM 回复)→ END
5. 执行测试
# 创建带 ID 的消息(RemoveMessage 需要 ID)
messages = [
AIMessage("Hi.", name="Bot", id="1"),
HumanMessage("Hi.", name="Lance", id="2"),
AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3"),
HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4")
]
# 调用图
output = graph.invoke({'messages': messages})
# 查看结果
for m in output['messages']:
m.pretty_print()
2
3
4
5
6
7
8
9
10
11
12
13
14
预期结果:
# 只保留最后 2 条消息 + LLM 回复
================================== Ai Message ==================================
Name: Bot
So you said you were researching ocean mammals?
================================ Human Message =================================
Name: Lance
Yes, I know about whales. But what others should I learn about?
================================== Ai Message ==================================
Here are some ocean mammals: dolphins, seals, sea lions, ...
2
3
4
5
6
7
8
9
10
11
12
注意: 前两条消息(id="1" 和 id="2")已被永久删除!
Python 知识点:列表切片
messages = [msg1, msg2, msg3, msg4, msg5]
# 取最后 2 条
messages[-2:] # → [msg4, msg5]
# 除了最后 2 条的所有消息
messages[:-2] # → [msg1, msg2, msg3]
# 第一条
messages[:1] # → [msg1]
# 最后一条
messages[-1:] # → [msg5]
2
3
4
5
6
7
8
9
10
11
12
13
🔧 方案二:消息过滤(不修改状态)
核心思路
不修改图状态,只在传递给 LLM 时过滤消息。状态保留完整历史,但 LLM 只看到部分消息。
代码实现
1. 修改聊天节点
def chat_model_node(state: MessagesState):
"""只传递最后一条消息给 LLM"""
return {"messages": [llm.invoke(state["messages"][-1:])]}
# ^^^^^^^^^^^^^^^^^^^
# 只取最后一条消息
2
3
4
5
2. 简化的图结构
# 创建图(无需 filter 节点!)
builder = StateGraph(MessagesState)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()
2
3
4
5
6
对比方案一:
特性 | RemoveMessage | 消息过滤 |
---|---|---|
节点数量 | 2 个(filter + chat) | 1 个(chat) |
状态修改 | 永久删除 | 保持不变 |
历史可用性 | 历史丢失 | 历史完整保留 |
3. 测试多轮对话
# 初始消息
messages = [
AIMessage("So you said you were researching ocean mammals?", name="Bot"),
HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance")
]
# 第一轮调用
output = graph.invoke({'messages': messages})
# 追加 LLM 回复和新问题
messages.append(output['messages'][-1])
messages.append(HumanMessage("Tell me more about Narwhals!", name="Lance"))
# 第二轮调用
output = graph.invoke({'messages': messages})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
关键观察:
虽然 messages
包含完整历史,但 LLM 每次只看到最后一条消息!
# messages 实际内容:
[
AIMessage("...ocean mammals?"),
HumanMessage("...whales..."),
AIMessage("...dolphins, seals..."),
HumanMessage("Tell me more about Narwhals!") # ← LLM 只看到这条
]
2
3
4
5
6
7
在 LangSmith 中验证
查看 LangSmith 追踪,你会发现 LLM 调用的 messages
参数只包含最后一条消息,而不是完整历史。
优势:
- 状态完整保留(可用于日志、分析)
- LLM token 使用受控
- 实现简单(无需额外节点)
劣势:
- LLM 缺乏上下文(可能导致回答不连贯)
- 需要手动管理需要传递的消息范围
🔧 方案三:智能裁剪(基于 Token)
核心思路
使用 LangChain 的 trim_messages
工具,基于 token 数量 智能裁剪消息,而不是简单地按数量过滤。
为什么需要 Token 裁剪?
不同消息的 token 数量差异很大:
msg1 = HumanMessage("Hi") # ~1 token
msg2 = AIMessage("Hello! How can I help?") # ~5 tokens
msg3 = HumanMessage("Tell me about whales... (1000 字)") # ~1500 tokens
2
3
如果简单过滤"最后 2 条消息":
- 可能是 6 tokens(msg1 + msg2)
- 也可能是 1505 tokens(msg2 + msg3)
Token 裁剪确保精确控制!
代码实现
1. 导入裁剪工具
from langchain_core.messages import trim_messages
2. 修改聊天节点
def chat_model_node(state: MessagesState):
"""使用 token 裁剪"""
# 裁剪消息到最多 100 tokens
messages = trim_messages(
state["messages"], # 原始消息列表
max_tokens=100, # 最大 token 数
strategy="last", # 保留最后的消息
token_counter=ChatOpenAI(model="gpt-4o"), # Token 计数器
allow_partial=False, # 不允许截断消息
)
return {"messages": [llm.invoke(messages)]}
2
3
4
5
6
7
8
9
10
11
LangChain 知识点:trim_messages 参数详解
trim_messages(
messages, # 要裁剪的消息列表
max_tokens=100, # 最大 token 数
strategy="last", # 裁剪策略(见下方)
token_counter=model, # Token 计数器(通常是模型实例)
allow_partial=False, # 是否允许部分截断单条消息
)
2
3
4
5
6
7
裁剪策略(strategy):
策略 | 说明 | 示例 |
---|---|---|
"last" | 保留最后的消息 | 适合聊天机器人(保留最近对话) |
"first" | 保留最开始的消息 | 适合保留系统指令 |
allow_partial 参数:
# allow_partial=False(默认)
# 如果某条消息超过剩余 token,直接丢弃整条消息
messages = [msg1(50 tokens), msg2(60 tokens), msg3(40 tokens)]
trim_messages(messages, max_tokens=100, allow_partial=False)
# → [msg2, msg3] # msg1 被丢弃,因为 msg2+msg3=100 tokens
# allow_partial=True
# 会截断消息以精确达到 max_tokens
trim_messages(messages, max_tokens=100, allow_partial=True)
# → [msg1(部分截断), msg2, msg3]
2
3
4
5
6
7
8
9
10
3. 测试裁剪效果
# 准备长对话历史
messages = [
AIMessage("Hi.", name="Bot", id="1"),
HumanMessage("Hi.", name="Lance", id="2"),
AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3"),
HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4"),
AIMessage("Consider dolphins, seals, sea lions...", name="Bot", id="5"),
HumanMessage("Tell me where Orcas live!", name="Lance", id="6")
]
# 测试裁剪(不调用 LLM)
trimmed = trim_messages(
messages,
max_tokens=100,
strategy="last",
token_counter=ChatOpenAI(model="gpt-4o"),
allow_partial=False
)
print(f"原始消息数: {len(messages)}")
print(f"裁剪后消息数: {len(trimmed)}")
for m in trimmed:
m.pretty_print()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
预期输出:
根据 token 计算,可能只保留最后 2-3 条消息,确保总数不超过 100 tokens。
4. 在图中使用
# 构建图
builder = StateGraph(MessagesState)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()
# 调用
output = graph.invoke({'messages': messages})
2
3
4
5
6
7
8
9
验证: 在 LangSmith 中查看追踪,确认 LLM 收到的消息总 token 数 ≤ 100。
Python 知识点:函数参数传递
# trim_messages 接收模型实例作为 token_counter
token_counter=ChatOpenAI(model="gpt-4o")
# 内部调用模型的 token 计数方法
# model.get_num_tokens_from_messages(messages)
2
3
4
5
为什么需要模型实例?
不同模型的 tokenizer 不同:
- GPT-4:使用 cl100k_base encoding
- GPT-3.5:使用 cl100k_base encoding
- Claude:使用不同的 tokenizer
必须使用对应模型的 tokenizer 才能准确计数!
🎓 核心知识点总结
LangGraph 特有概念
1. MessagesState
预定义的状态模式,专为聊天应用设计:
from langgraph.graph import MessagesState
class MessagesState(TypedDict):
messages: Annotated[list, add_messages]
# ^^^^^^^^^^^^^^^^^^^^^^^
# 自动使用 add_messages reducer
2
3
4
5
6
优势:
- 自动处理消息追加
- 支持
RemoveMessage
- 简化状态定义
2. add_messages Reducer
内置的智能消息 reducer:
# 普通追加
state["messages"] + [new_message] # 追加消息
# 识别 RemoveMessage
state["messages"] + [RemoveMessage(id="123")] # 删除消息
# 自动去重(基于 ID)
state["messages"] + [existing_message] # 如果 ID 相同,不重复添加
2
3
4
5
6
7
8
3. 三种消息管理模式对比
模式 | 状态修改 | Token 控制 | 历史保留 | 复杂度 | 适用场景 |
---|---|---|---|---|---|
RemoveMessage | ✅ 永久删除 | 间接(删除消息) | ❌ 部分丢失 | 中 | 确定不需要历史时 |
消息过滤 | ❌ 不修改 | ✅ 精确(按数量) | ✅ 完整保留 | 低 | 简单场景,固定数量 |
Token 裁剪 | ❌ 不修改 | ✅✅ 非常精确(按 token) | ✅ 完整保留 | 中 | 需要精确控制成本 |
Python 特有知识点
1. 消息类的层次结构
BaseMessage (基类)
├── HumanMessage (用户消息)
├── AIMessage (AI 回复)
├── SystemMessage (系统指令)
├── FunctionMessage (函数调用结果)
└── RemoveMessage (删除标记)
2
3
4
5
6
继承关系:
from langchain_core.messages import BaseMessage, HumanMessage
# HumanMessage 继承自 BaseMessage
isinstance(HumanMessage("Hi"), BaseMessage) # True
2
3
4
2. 列表切片的高级用法
messages = [msg1, msg2, msg3, msg4, msg5]
# 负数索引
messages[-1] # 最后一个元素
messages[-2:] # 最后两个元素
# 步长切片
messages[::2] # 每隔一个取一个: [msg1, msg3, msg5]
messages[::-1] # 反转列表
# 省略参数
messages[2:] # 从第3个到结尾
messages[:3] # 从开头到第3个(不含)
messages[:] # 完整复制
2
3
4
5
6
7
8
9
10
11
12
13
14
3. 列表推导式
# 基础形式
[RemoveMessage(id=m.id) for m in messages[:-2]]
# 等价于:
result = []
for m in messages[:-2]:
result.append(RemoveMessage(id=m.id))
2
3
4
5
6
7
带条件的列表推导:
# 只删除 HumanMessage
[RemoveMessage(id=m.id) for m in messages if isinstance(m, HumanMessage)]
# 删除超过 100 tokens 的消息
[RemoveMessage(id=m.id) for m in messages if get_token_count(m) > 100]
2
3
4
5
💡 最佳实践
1. 何时使用哪种方案?
使用 RemoveMessage 的场景
✅ 适用:
- 确定旧消息不再需要(如已完成的任务)
- 需要永久减少状态大小(节省内存)
- 实现"忘记"机制(如定期清理 7 天前的对话)
❌ 不适用:
- 可能需要回溯历史
- 需要保留完整日志
- 调试阶段(删除后无法恢复)
示例:
def cleanup_old_messages(state: MessagesState):
"""删除 7 天前的消息"""
cutoff = datetime.now() - timedelta(days=7)
to_remove = [
RemoveMessage(id=m.id)
for m in state["messages"]
if m.timestamp < cutoff
]
return {"messages": to_remove}
2
3
4
5
6
7
8
9
使用消息过滤的场景
✅ 适用:
- 快速原型开发
- 简单的"最近 N 条消息"逻辑
- 不想修改状态(方便回滚)
❌ 不适用:
- 需要精确控制 token 成本
- 消息长度差异很大
示例:
# 场景:客服机器人只需要最近 5 轮对话
def chat_node(state: MessagesState):
recent_messages = state["messages"][-10:] # 5 轮 = 10 条消息(用户+AI)
return {"messages": [llm.invoke(recent_messages)]}
2
3
4
使用 Token 裁剪的场景
✅ 适用:
- 生产环境(需要严格成本控制)
- 消息长度不可预测
- 需要充分利用上下文窗口(如接近 4096 tokens 限制)
❌ 不适用:
- 开发早期(增加复杂度)
- 消息数量固定且短
示例:
# 场景:文档问答,用户可能粘贴长文本
def qa_node(state: MessagesState):
# 保留系统指令 + 尽可能多的对话历史
messages = trim_messages(
state["messages"],
max_tokens=3500, # 为 LLM 回复预留 500 tokens
strategy="last",
token_counter=llm,
allow_partial=False
)
return {"messages": [llm.invoke(messages)]}
2
3
4
5
6
7
8
9
10
11
2. 组合使用多种技术
最佳实践: 在实际应用中,通常组合使用多种技术。
示例:智能客服系统
def intelligent_chat_node(state: MessagesState):
messages = state["messages"]
# 第 1 步:删除系统消息以外的过期消息(7 天前)
# (在单独的 cleanup 节点中处理)
# 第 2 步:保留系统指令
system_messages = [m for m in messages if isinstance(m, SystemMessage)]
conversation_messages = [m for m in messages if not isinstance(m, SystemMessage)]
# 第 3 步:Token 裁剪对话历史
trimmed_conversation = trim_messages(
conversation_messages,
max_tokens=2000,
strategy="last",
token_counter=llm,
allow_partial=False
)
# 第 4 步:重组消息(系统指令 + 裁剪后的对话)
final_messages = system_messages + trimmed_conversation
# 第 5 步:调用 LLM
return {"messages": [llm.invoke(final_messages)]}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
3. 状态管理技巧
技巧 1:始终保留系统消息
def trim_with_system(messages, max_tokens):
# 分离系统消息和对话消息
system_msgs = [m for m in messages if isinstance(m, SystemMessage)]
other_msgs = [m for m in messages if not isinstance(m, SystemMessage)]
# 计算系统消息的 token 数
system_tokens = sum(llm.get_num_tokens(m.content) for m in system_msgs)
# 裁剪对话消息(减去系统消息占用的 tokens)
trimmed = trim_messages(
other_msgs,
max_tokens=max_tokens - system_tokens,
strategy="last",
token_counter=llm
)
return system_msgs + trimmed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
技巧 2:保留重要消息
def trim_with_pinned(state: MessagesState, max_tokens):
messages = state["messages"]
# 标记重要消息(假设通过 metadata)
pinned = [m for m in messages if m.additional_kwargs.get("pinned")]
unpinned = [m for m in messages if not m.additional_kwargs.get("pinned")]
# 只裁剪非重要消息
pinned_tokens = sum(llm.get_num_tokens(m.content) for m in pinned)
trimmed = trim_messages(
unpinned,
max_tokens=max_tokens - pinned_tokens,
strategy="last",
token_counter=llm
)
# 重新排序(保持时间顺序)
all_messages = sorted(pinned + trimmed, key=lambda m: m.timestamp)
return {"messages": all_messages}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
技巧 3:动态调整裁剪策略
def adaptive_trim(state: MessagesState):
messages = state["messages"]
conversation_length = len(messages)
# 根据对话长度动态调整 max_tokens
if conversation_length < 10:
max_tokens = 1000 # 短对话:保留更多上下文
elif conversation_length < 50:
max_tokens = 500 # 中等对话:适度裁剪
else:
max_tokens = 200 # 长对话:激进裁剪
trimmed = trim_messages(
messages,
max_tokens=max_tokens,
strategy="last",
token_counter=llm
)
return {"messages": [llm.invoke(trimmed)]}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
🚀 进阶技巧
1. 实现滑动窗口
保留"最近 N 条消息 + 最开始的系统指令":
def sliding_window_trim(messages, window_size=10):
if len(messages) <= window_size:
return messages
# 假设第一条是系统指令
system_msg = messages[0] if isinstance(messages[0], SystemMessage) else None
if system_msg:
# 保留系统指令 + 最后 N-1 条消息
return [system_msg] + messages[-(window_size-1):]
else:
# 只保留最后 N 条
return messages[-window_size:]
2
3
4
5
6
7
8
9
10
11
12
13
2. 基于摘要的历史压缩
对于超长对话,可以定期生成摘要:
class ConversationState(TypedDict):
messages: Annotated[list, add_messages]
summary: str # 对话摘要
def summarize_old_messages(state: ConversationState):
messages = state["messages"]
# 如果消息超过 50 条,生成摘要
if len(messages) > 50:
# 摘要前 40 条消息
to_summarize = messages[:40]
summary_prompt = f"Summarize this conversation:\n{to_summarize}"
summary = llm.invoke(summary_prompt).content
# 删除已摘要的消息
to_remove = [RemoveMessage(id=m.id) for m in to_summarize]
return {
"messages": to_remove,
"summary": summary # 保存摘要
}
return {}
def chat_with_summary(state: ConversationState):
# 构建上下文:摘要 + 最近消息
context = []
if state.get("summary"):
context.append(SystemMessage(f"Previous conversation summary: {state['summary']}"))
context.extend(state["messages"])
return {"messages": [llm.invoke(context)]}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
3. 按角色过滤
只保留特定角色的消息:
def filter_by_role(messages, keep_roles=["human", "ai"]):
"""只保留用户和 AI 的消息,移除系统消息"""
return [
m for m in messages
if m.type in keep_roles
]
# 使用
def chat_node(state: MessagesState):
# 只传递用户和 AI 的对话给 LLM
filtered = filter_by_role(state["messages"], keep_roles=["human", "ai"])
return {"messages": [llm.invoke(filtered)]}
2
3
4
5
6
7
8
9
10
11
12
4. 智能去重
移除重复或相似的消息:
def deduplicate_messages(messages):
"""移除内容完全相同的连续消息"""
if not messages:
return messages
deduplicated = [messages[0]]
for msg in messages[1:]:
if msg.content != deduplicated[-1].content:
deduplicated.append(msg)
return deduplicated
2
3
4
5
6
7
8
9
10
11
📊 性能对比
Token 使用对比
假设 10 轮对话,每轮平均 100 tokens:
方案 | Token 使用 | 成本(GPT-4 输入) | 延迟 |
---|---|---|---|
无裁剪 | 1000 tokens | $0.03 | 高 |
过滤最后 5 条 | ~500 tokens | $0.015 | 中 |
裁剪到 200 tokens | 200 tokens | $0.006 | 低 |
实测示例
import time
# 准备长对话(100 条消息)
long_conversation = [
HumanMessage(f"Message {i}") if i % 2 == 0 else AIMessage(f"Response {i}")
for i in range(100)
]
# 方案 1:无裁剪
start = time.time()
response1 = llm.invoke(long_conversation)
time1 = time.time() - start
# 方案 2:过滤最后 10 条
start = time.time()
response2 = llm.invoke(long_conversation[-10:])
time2 = time.time() - start
# 方案 3:Token 裁剪到 500
start = time.time()
trimmed = trim_messages(long_conversation, max_tokens=500, token_counter=llm)
response3 = llm.invoke(trimmed)
time3 = time.time() - start
print(f"无裁剪: {time1:.2f}s")
print(f"过滤: {time2:.2f}s (快 {(time1-time2)/time1*100:.0f}%)")
print(f"裁剪: {time3:.2f}s (快 {(time1-time3)/time1*100:.0f}%)")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
🎯 实际应用案例
案例 1:客服聊天机器人
class CustomerServiceState(TypedDict):
messages: Annotated[list, add_messages]
customer_id: str
session_start: datetime
def customer_service_chat(state: CustomerServiceState):
"""客服机器人:保留完整历史但限制 LLM 可见范围"""
messages = state["messages"]
# 1. 始终保留系统指令(客服规范)
system_msg = SystemMessage("You are a helpful customer service agent. Be polite and professional.")
# 2. 裁剪对话历史到 1500 tokens(约 10-15 轮对话)
trimmed = trim_messages(
messages,
max_tokens=1500,
strategy="last",
token_counter=llm,
allow_partial=False
)
# 3. 组合系统指令和裁剪后的对话
final_messages = [system_msg] + trimmed
return {"messages": [llm.invoke(final_messages)]}
# 构建图
builder = StateGraph(CustomerServiceState)
builder.add_node("chat", customer_service_chat)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)
graph = builder.compile()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
案例 2:文档问答助手
def document_qa_chat(state: MessagesState):
"""文档问答:用户可能粘贴长文档"""
messages = state["messages"]
# 1. 识别长消息(超过 1000 tokens 的)
long_messages = []
short_messages = []
for m in messages:
token_count = llm.get_num_tokens(m.content)
if token_count > 1000:
# 截断长消息
truncated_content = m.content[:2000] + "... [truncated]"
long_messages.append(m.__class__(content=truncated_content))
else:
short_messages.append(m)
# 2. 裁剪短消息
trimmed_short = trim_messages(
short_messages,
max_tokens=1500,
strategy="last",
token_counter=llm
)
# 3. 组合(保留最近的长消息 + 裁剪后的短消息)
final_messages = (long_messages[-1:] if long_messages else []) + trimmed_short
return {"messages": [llm.invoke(final_messages)]}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
案例 3:定期清理旧消息
from datetime import datetime, timedelta
class TimedState(TypedDict):
messages: Annotated[list, add_messages]
last_cleanup: datetime
def cleanup_node(state: TimedState):
"""每 24 小时清理一次旧消息"""
now = datetime.now()
last_cleanup = state.get("last_cleanup", now)
# 如果距离上次清理超过 24 小时
if (now - last_cleanup).total_seconds() > 86400:
messages = state["messages"]
cutoff = now - timedelta(days=7)
# 删除 7 天前的消息
to_remove = [
RemoveMessage(id=m.id)
for m in messages
if hasattr(m, 'timestamp') and m.timestamp < cutoff
]
return {
"messages": to_remove,
"last_cleanup": now
}
return {}
# 在图中添加清理节点
builder.add_node("cleanup", cleanup_node)
builder.add_node("chat", chat_node)
builder.add_edge(START, "cleanup")
builder.add_edge("cleanup", "chat")
builder.add_edge("chat", END)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
🔍 常见问题
Q1: RemoveMessage 删除后能恢复吗?
不能! RemoveMessage
是永久操作。如果需要保留历史:
# ❌ 错误:直接删除
return {"messages": [RemoveMessage(id=msg.id)]}
# ✅ 正确:先备份
archived_messages = state["messages"][:-10] # 保存到数据库
to_remove = [RemoveMessage(id=m.id) for m in archived_messages]
return {"messages": to_remove}
2
3
4
5
6
7
Q2: trim_messages 会修改原始消息吗?
不会! trim_messages
返回新列表,不修改输入:
original = [msg1, msg2, msg3]
trimmed = trim_messages(original, max_tokens=50, token_counter=llm)
# original 仍然是 [msg1, msg2, msg3]
# trimmed 可能是 [msg2, msg3](新列表)
2
3
4
5
Q3: 如何处理系统消息?
最佳实践: 始终保留系统消息,单独处理:
def smart_trim(messages, max_tokens):
system = [m for m in messages if isinstance(m, SystemMessage)]
others = [m for m in messages if not isinstance(m, SystemMessage)]
# 只裁剪非系统消息
system_tokens = sum(llm.get_num_tokens(m.content) for m in system)
trimmed_others = trim_messages(
others,
max_tokens=max_tokens - system_tokens,
token_counter=llm
)
return system + trimmed_others
2
3
4
5
6
7
8
9
10
11
12
13
Q4: allow_partial=True 会截断到什么位置?
答: 截断到恰好达到 max_tokens
的位置:
message = HumanMessage("This is a very long message with many words...")
# 假设这条消息有 150 tokens
trimmed = trim_messages(
[message],
max_tokens=100,
allow_partial=True,
token_counter=llm
)
# 结果:消息被截断到约 100 tokens
# "This is a very long message with..."
2
3
4
5
6
7
8
9
10
11
12
注意: 截断可能导致语义不完整,谨慎使用!
Q5: 如何统计 Token 数量?
使用模型的 get_num_tokens
方法:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
# 单条消息
message = HumanMessage("Hello, world!")
tokens = llm.get_num_tokens(message.content)
print(f"Tokens: {tokens}")
# 消息列表
messages = [HumanMessage("Hi"), AIMessage("Hello!")]
total_tokens = llm.get_num_tokens_from_messages(messages)
print(f"Total tokens: {total_tokens}")
2
3
4
5
6
7
8
9
10
11
12
13
📖 扩展阅读
🎓 总结
消息管理是构建生产级聊天应用的关键技能。本教程介绍了三种核心技术:
技术 | 核心价值 | 使用场景 |
---|---|---|
RemoveMessage | 永久删除,节省内存 | 确定不需要的历史 |
消息过滤 | 简单高效,不修改状态 | 快速原型,固定规则 |
Token 裁剪 | 精确控制成本 | 生产环境,变长消息 |
关键要点:
- 理解场景:根据应用需求选择合适的技术
- 保留灵活性:状态保留完整历史,只在传递给 LLM 时裁剪
- 组合使用:实际应用中常常需要组合多种技术
- 监控 Token:使用 LangSmith 追踪实际 token 使用情况
- 渐进优化:从简单方案开始,根据需求逐步优化
通过掌握这些技术,你可以构建既高效又经济的长期记忆聊天机器人!