LangGraph State Reducers 详细解读
网站使用说明
📚 概述
本文档详细解读 LangGraph 中的 State Reducers(状态归约器)。这是 LangGraph 状态管理的核心机制,用于控制状态更新的方式。通过 Reducers,我们可以优雅地处理并行节点的状态冲突、消息追加、自定义聚合等复杂场景。
核心价值:
- 解决并行节点的状态冲突问题
- 提供灵活的状态更新策略
- 支持消息的追加、修改和删除
- 允许自定义状态聚合逻辑
📚 术语表
| 术语名称 | LangGraph 定义和解读 | Python 定义和说明 | 重要程度 |
|---|---|---|---|
| Reducer | LangGraph 中定义如何合并多个节点状态更新的函数,解决并行冲突问题 | 接收 (旧值, 新值) 两个参数并返回合并结果的函数 | ⭐⭐⭐⭐⭐ |
| operator.add | LangGraph 最常用的内置 Reducer,用于拼接列表或字符串 | Python operator 模块的加法运算符函数,add([1,2], [3]) = [1,2,3] | ⭐⭐⭐⭐⭐ |
| Annotated | 为类型添加 Reducer 元数据的语法,格式为 Annotated[类型, reducer函数] | Python typing 的泛型,如 Annotated[list[int], add] | ⭐⭐⭐⭐⭐ |
| add_messages | LangGraph 专门用于消息列表的 Reducer,支持追加、修改、删除消息 | 内置函数,通过消息 ID 智能合并,支持 RemoveMessage 删除操作 | ⭐⭐⭐⭐⭐ |
| MessagesState | LangGraph 预定义的状态基类,自动包含 messages 字段和 add_messages reducer | TypedDict 子类,包含 messages: Annotated[list, add_messages] | ⭐⭐⭐⭐⭐ |
| InvalidUpdateError | 并行节点同时更新同一字段但没有 Reducer 时抛出的错误 | LangGraph 异常类,表示状态更新冲突 | ⭐⭐⭐⭐ |
| RemoveMessage | 特殊消息类型,用于从状态中删除指定 ID 的消息 | LangChain 消息类,配合 add_messages 使用:RemoveMessage(id="123") | ⭐⭐⭐⭐⭐ |
| 消息重写 | add_messages 的特性:相同 ID 的新消息会覆盖旧消息 | 通过设置消息的 id 属性实现消息内容更新 | ⭐⭐⭐ |
| 自定义 Reducer | 用户定义的状态合并函数,可实现任意复杂逻辑 | 函数签名 def reducer(left, right) -> result,需处理 None 等边缘情况 | ⭐⭐⭐⭐ |
| 并行节点冲突 | 多个节点同时更新同一状态字段时产生的冲突,需要 Reducer 解决 | 无 Reducer 时会抛出 InvalidUpdateError,有 Reducer 则自动合并 | ⭐⭐⭐⭐⭐ |
🎯 核心概念
什么是 State Reducer?
Reducer(归约器) 是一个函数,用于定义如何将新值合并到现有状态中。
类比理解:
想象你有一个购物车(状态)
- 默认方式:每次都替换整个购物车(覆盖)
- Reducer 方式:可以选择"追加商品"、"删除商品"、"合并购物车"等策略LangGraph 中的应用:
- 当多个节点同时更新同一个状态字段时,Reducer 决定如何处理这些更新
- 避免状态冲突和数据丢失
- 提供一致的状态更新行为
⚠️ 问题场景:默认覆盖行为
场景 1:单节点更新(无问题)
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
foo: int
def node_1(state):
print("---Node 1---")
return {"foo": state['foo'] + 1}
# 构建图
builder = StateGraph(State)
builder.add_node("node_1", node_1)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
# 执行
result = graph.invoke({"foo": 1})
# 输出: {'foo': 2}生成的流程图:
说明:
- 默认情况下,节点返回的状态会 覆盖 原有状态
{"foo": 1}→node_1→{"foo": 2}- 单节点更新时没有问题
完整案例代码
以下是场景 1 的完整可运行代码:
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display
# ========== 1. 定义状态 ==========
class State(TypedDict):
foo: int
# ========== 2. 定义节点 ==========
def node_1(state):
"""单节点:将 foo 值加 1"""
print("---Node 1---")
return {"foo": state['foo'] + 1}
# ========== 3. 构建图 ==========
builder = StateGraph(State)
builder.add_node("node_1", node_1)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", END)
# ========== 4. 编译 ==========
graph = builder.compile()
# ========== 5. 可视化 ==========
display(Image(graph.get_graph().draw_mermaid_png()))
# ========== 6. 执行 ==========
result = graph.invoke({"foo": 1})
print(f"结果: {result}")
# 输出:
# ---Node 1---
# 结果: {'foo': 2}场景 2:并行节点冲突(有问题!)❌
class State(TypedDict):
foo: int
def node_1(state):
print("---Node 1---")
return {"foo": state['foo'] + 1}
def node_2(state):
print("---Node 2---")
return {"foo": state['foo'] + 1}
def node_3(state):
print("---Node 3---")
return {"foo": state['foo'] + 1}
# 构建图
builder = StateGraph(State)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
# 关键:node_1 分支到 node_2 和 node_3(并行执行)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_1", "node_3")
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
# 执行会报错!
from langgraph.errors import InvalidUpdateError
try:
graph.invoke({"foo": 1})
except InvalidUpdateError as e:
print(f"InvalidUpdateError occurred: {e}")生成的流程图:
执行流程:
输入: {"foo": 1}
↓
node_1: {"foo": 2}
↓
┌─────────┴─────────┐
node_2 node_3
{"foo": 3} {"foo": 3}
↓ ↓
└─────────┬─────────┘
❌ 冲突!问题分析:
node_1执行后,foo = 2node_2和node_3并行执行(在同一步)- 两个节点都尝试将
foo覆盖为3 - LangGraph 不知道应该保留哪个值 → InvalidUpdateError
完整案例代码
以下是场景 2 的完整可运行代码(演示并行冲突):
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.errors import InvalidUpdateError
from IPython.display import Image, display
# ========== 1. 定义状态(无 Reducer)==========
class State(TypedDict):
foo: int
# ========== 2. 定义节点 ==========
def node_1(state):
print("---Node 1---")
return {"foo": state['foo'] + 1}
def node_2(state):
print("---Node 2---")
return {"foo": state['foo'] + 1}
def node_3(state):
print("---Node 3---")
return {"foo": state['foo'] + 1}
# ========== 3. 构建图(并行结构)==========
builder = StateGraph(State)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
# node_1 后分支到 node_2 和 node_3(并行执行)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_1", "node_3")
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)
# ========== 4. 编译 ==========
graph = builder.compile()
# ========== 5. 可视化 ==========
display(Image(graph.get_graph().draw_mermaid_png()))
# ========== 6. 执行(会报错!)==========
try:
result = graph.invoke({"foo": 1})
except InvalidUpdateError as e:
print(f"❌ InvalidUpdateError: {e}")
print("原因:node_2 和 node_3 并行更新同一字段,产生冲突!")✅ 解决方案:使用 Reducer
方案 1:使用 operator.add Reducer
from operator import add
from typing import Annotated
class State(TypedDict):
foo: Annotated[list[int], add] # ⭐ 关键:使用 Annotated + add
def node_1(state):
print("---Node 1---")
return {"foo": [state['foo'][-1] + 1]}
def node_2(state):
print("---Node 2---")
return {"foo": [state['foo'][-1] + 1]}
def node_3(state):
print("---Node 3---")
return {"foo": [state['foo'][-1] + 1]}
# 构建图(同样的并行结构)
builder = StateGraph(State)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_1", "node_3")
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
# 执行成功!
result = graph.invoke({"foo": [1]})
# 输出: {'foo': [1, 2, 3, 3]}生成的流程图:
执行流程:
输入: {"foo": [1]}
↓
node_1: {"foo": [1, 2]} (追加 2)
↓
┌──────────┴──────────┐
node_2 node_3
追加 3 追加 3
↓ ↓
└──────────┬──────────┘
合并
{"foo": [1, 2, 3, 3]} ✅ 成功!关键点:
Annotated[list[int], add]指定使用operator.add作为 Reduceroperator.add对列表执行拼接操作:[1, 2] + [3] = [1, 2, 3]- 并行节点的返回值会被追加而不是覆盖
- 最终状态包含所有节点的更新
完整案例代码
以下是使用 operator.add Reducer 的完整可运行代码:
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display
# ========== 1. 定义状态(使用 Reducer)==========
class State(TypedDict):
# ⭐ 关键:使用 Annotated + add 指定 Reducer
foo: Annotated[list[int], add]
# ========== 2. 定义节点 ==========
def node_1(state):
"""第一个节点:追加计算结果"""
print("---Node 1---")
return {"foo": [state['foo'][-1] + 1]}
def node_2(state):
"""第二个节点(并行):追加计算结果"""
print("---Node 2---")
return {"foo": [state['foo'][-1] + 1]}
def node_3(state):
"""第三个节点(并行):追加计算结果"""
print("---Node 3---")
return {"foo": [state['foo'][-1] + 1]}
# ========== 3. 构建图(并行结构)==========
builder = StateGraph(State)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_1", "node_3")
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)
# ========== 4. 编译 ==========
graph = builder.compile()
# ========== 5. 可视化 ==========
display(Image(graph.get_graph().draw_mermaid_png()))
# ========== 6. 执行 ==========
result = graph.invoke({"foo": [1]})
print(f"结果: {result}")
# 输出:
# ---Node 1---
# ---Node 2---
# ---Node 3---
# 结果: {'foo': [1, 2, 3, 3]}
# 解释: 初始[1] → node_1追加[2] → node_2和node_3并行各追加[3]🔬 深入理解 Annotated 和 Reducer
Python 知识点:Annotated 类型
from typing import Annotated
# 基本用法
Annotated[类型, 元数据1, 元数据2, ...]
# 在 LangGraph 中
Annotated[list[int], add]
# ^^^^^^^^^ ^^^
# 类型定义 Reducer 函数Annotated 的作用:
- 类型提示:
list[int]告诉 IDE 和类型检查器这是一个整数列表 - 元数据:
add是附加的元数据,LangGraph 用它来确定状态更新策略 - 不影响运行时类型:Python 运行时仍然认为它是
list[int]
Python 知识点:operator.add
import operator
# 对数字:执行加法
operator.add(1, 2) # → 3
# 对字符串:执行拼接
operator.add("hello", " world") # → "hello world"
# 对列表:执行列表拼接
operator.add([1, 2], [3, 4]) # → [1, 2, 3, 4]
# 在 LangGraph 中
# 旧状态: [1, 2]
# 新更新: [3]
# 结果: operator.add([1, 2], [3]) → [1, 2, 3]为什么选择 operator.add?
- 是 Python 内置模块,性能好
- 语义清晰:将新值"加"到旧值上
- 对列表的操作符合直觉(拼接)
🛠️ 自定义 Reducer
问题:operator.add 无法处理 None
from operator import add
class State(TypedDict):
foo: Annotated[list[int], add]
def node_1(state):
return {"foo": [2]}
graph = ... # 构建图
# 传入 None 会报错
try:
graph.invoke({"foo": None})
except TypeError as e:
print(f"TypeError occurred: {e}")
# TypeError: unsupported operand type(s) for +: 'NoneType' and 'list'原因:
operator.add(None, [2])无法执行None + list是非法操作
解决方案:自定义 Reducer 函数
def reduce_list(left: list | None, right: list | None) -> list:
"""
安全地合并两个列表,处理 None 的情况
Args:
left: 旧状态(可能是 None)
right: 新更新(可能是 None)
Returns:
合并后的列表
"""
if not left:
left = []
if not right:
right = []
return left + right
# 使用自定义 Reducer
class CustomReducerState(TypedDict):
foo: Annotated[list[int], reduce_list] # ⭐ 使用自定义函数
def node_1(state):
print("---Node 1---")
return {"foo": [2]}
# 构建图
builder = StateGraph(CustomReducerState)
builder.add_node("node_1", node_1)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
# 现在可以处理 None 了!
result = graph.invoke({"foo": None})
# 输出: {'foo': [2]}生成的流程图:
自定义 Reducer 的优势:
- 可以实现任意复杂的合并逻辑
- 处理边缘情况(如 None、空值)
- 更符合业务需求
完整案例代码
以下是自定义 Reducer 的完整可运行代码:
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display
# ========== 1. 定义自定义 Reducer ==========
def reduce_list(left: list | None, right: list | None) -> list:
"""
安全地合并两个列表,处理 None 的情况
"""
if not left:
left = []
if not right:
right = []
return left + right
# ========== 2. 定义状态(使用自定义 Reducer)==========
class CustomReducerState(TypedDict):
foo: Annotated[list[int], reduce_list] # ⭐ 使用自定义函数
# ========== 3. 定义节点 ==========
def node_1(state):
print("---Node 1---")
return {"foo": [2]}
def node_2(state):
print("---Node 2---")
return {"foo": [3, 4]}
# ========== 4. 构建图 ==========
builder = StateGraph(CustomReducerState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", END)
# ========== 5. 编译 ==========
graph = builder.compile()
# ========== 6. 可视化 ==========
display(Image(graph.get_graph().draw_mermaid_png()))
# ========== 7. 测试 None 处理 ==========
print("测试 None 输入:")
result = graph.invoke({"foo": None})
print(f"结果: {result}")
# 输出: {'foo': [2, 3, 4]}
print("\n测试正常输入:")
result = graph.invoke({"foo": [1]})
print(f"结果: {result}")
# 输出: {'foo': [1, 2, 3, 4]}自定义 Reducer 示例
示例 1:只保留唯一值
def unique_reducer(left: list | None, right: list | None) -> list:
"""合并列表并去重"""
if not left:
left = []
if not right:
right = []
return list(set(left + right))
class State(TypedDict):
tags: Annotated[list[str], unique_reducer]
# 使用
# 旧状态: {"tags": ["python", "ai"]}
# 新更新: {"tags": ["python", "langchain"]}
# 结果: {"tags": ["python", "ai", "langchain"]} # 自动去重示例 2:保留最新的 N 个值
def keep_last_n(n: int):
"""工厂函数:创建一个只保留最新 n 个值的 Reducer"""
def reducer(left: list | None, right: list | None) -> list:
if not left:
left = []
if not right:
right = []
combined = left + right
return combined[-n:] # 只保留最后 n 个
return reducer
class State(TypedDict):
history: Annotated[list[str], keep_last_n(5)] # 只保留最新 5 个
# 使用
# 旧状态: {"history": ["a", "b", "c", "d", "e"]}
# 新更新: {"history": ["f", "g"]}
# 结果: {"history": ["c", "d", "e", "f", "g"]} # 只保留最新 5 个示例 3:合并字典
def merge_dict_reducer(left: dict | None, right: dict | None) -> dict:
"""深度合并两个字典"""
if not left:
left = {}
if not right:
right = {}
return {**left, **right}
class State(TypedDict):
config: Annotated[dict, merge_dict_reducer]
# 使用
# 旧状态: {"config": {"a": 1, "b": 2}}
# 新更新: {"config": {"b": 3, "c": 4}}
# 结果: {"config": {"a": 1, "b": 3, "c": 4}} # b 被覆盖,c 被添加💬 Messages Reducer:处理对话消息
内置的 add_messages Reducer
LangGraph 提供了专门用于处理消息的 Reducer:add_messages
两种使用方式:
方式 1:手动定义
from typing import Annotated
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
class CustomMessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
user_name: str
session_id: str方式 2:使用 MessagesState(推荐)
from langgraph.graph import MessagesState
# MessagesState 已经内置了 messages 字段和 add_messages reducer
class ExtendedMessagesState(MessagesState):
# 只需要添加额外的字段
user_name: str
session_id: strMessagesState 的优势:
- 自动包含
messages: Annotated[list[AnyMessage], add_messages] - 减少样板代码
- 更清晰的意图表达
add_messages 的基本用法
from langgraph.graph.message import add_messages
from langchain_core.messages import AIMessage, HumanMessage
# 初始状态
initial_messages = [
AIMessage(content="Hello! How can I assist you?", name="Model"),
HumanMessage(content="I'm looking for information on marine biology.", name="Lance")
]
# 新消息
new_message = AIMessage(
content="Sure, I can help with that. What specifically are you interested in?",
name="Model"
)
# 追加消息
result = add_messages(initial_messages, new_message)
# 结果
# [
# AIMessage(content="Hello! How can I assist you?", name="Model"),
# HumanMessage(content="I'm looking for information on marine biology.", name="Lance"),
# AIMessage(content="Sure, I can help with that...", name="Model") # ✅ 追加成功
# ]完整案例代码
以下是 add_messages 基本用法的完整可运行代码:
from langgraph.graph.message import add_messages
from langchain_core.messages import AIMessage, HumanMessage
# ========== 1. 准备初始消息 ==========
initial_messages = [
AIMessage(content="Hello! How can I assist you?", name="Model"),
HumanMessage(content="I'm looking for information on marine biology.", name="Lance")
]
print("初始消息:")
for msg in initial_messages:
print(f" [{msg.name}]: {msg.content}")
# ========== 2. 准备新消息 ==========
new_message = AIMessage(
content="Sure, I can help with that. What specifically are you interested in?",
name="Model"
)
print(f"\n新消息:")
print(f" [{new_message.name}]: {new_message.content}")
# ========== 3. 使用 add_messages 追加 ==========
result = add_messages(initial_messages, new_message)
print(f"\n合并后的消息 (共 {len(result)} 条):")
for i, msg in enumerate(result):
print(f" {i+1}. [{msg.name}]: {msg.content[:50]}...")
# 输出:
# 合并后的消息 (共 3 条):
# 1. [Model]: Hello! How can I assist you?
# 2. [Lance]: I'm looking for information on marine biology.
# 3. [Model]: Sure, I can help with that. What specifically...高级功能 1:消息重写(Re-writing)
核心机制: 如果新消息的 id 与现有消息相同,会覆盖旧消息
# 初始状态
initial_messages = [
AIMessage(content="Hello! How can I assist you?", name="Model", id="1"),
HumanMessage(content="I'm looking for information on marine biology.", name="Lance", id="2")
]
# 新消息(注意 id="2" 与第二条消息相同)
new_message = HumanMessage(
content="I'm looking for information on whales, specifically",
name="Lance",
id="2" # ⭐ 相同的 ID
)
# 重写消息
result = add_messages(initial_messages, new_message)
# 结果
# [
# AIMessage(content="Hello! How can I assist you?", name="Model", id="1"),
# HumanMessage(content="I'm looking for information on whales, specifically", name="Lance", id="2")
# # ✅ 第二条消息被重写
# ]应用场景:
- 修正用户输入
- 更新 AI 响应
- 实现消息编辑功能
完整案例代码
以下是消息重写功能的完整可运行代码:
from langgraph.graph.message import add_messages
from langchain_core.messages import AIMessage, HumanMessage
# ========== 1. 准备初始消息(带 ID)==========
initial_messages = [
AIMessage(content="Hello! How can I assist you?", name="Model", id="1"),
HumanMessage(content="I'm looking for information on marine biology.", name="Lance", id="2")
]
print("初始消息:")
for msg in initial_messages:
print(f" [id={msg.id}] {msg.name}: {msg.content}")
# ========== 2. 创建要重写的消息(相同 ID)==========
new_message = HumanMessage(
content="I'm looking for information on whales, specifically",
name="Lance",
id="2" # ⭐ 相同的 ID 会触发重写
)
print(f"\n新消息 (用于重写):")
print(f" [id={new_message.id}] {new_message.name}: {new_message.content}")
# ========== 3. 执行重写 ==========
result = add_messages(initial_messages, new_message)
print(f"\n重写后的消息:")
for msg in result:
print(f" [id={msg.id}] {msg.name}: {msg.content}")
# 输出:
# 重写后的消息:
# [id=1] Model: Hello! How can I assist you?
# [id=2] Lance: I'm looking for information on whales, specifically
# ✅ 第二条消息被重写(内容从 marine biology 变为 whales)高级功能 2:消息删除(Removal)
使用 RemoveMessage:
from langchain_core.messages import RemoveMessage, AIMessage, HumanMessage
# 消息列表
messages = [
AIMessage("Hi.", name="Bot", id="1"),
HumanMessage("Hi.", name="Lance", id="2"),
AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3"),
HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4")
]
# 创建删除指令(删除前两条消息)
delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]]
# [RemoveMessage(id="1"), RemoveMessage(id="2")]
# 执行删除
result = add_messages(messages, delete_messages)
# 结果
# [
# AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3"),
# HumanMessage("Yes, I know about whales...", name="Lance", id="4")
# ]
# ✅ id="1" 和 id="2" 的消息被删除应用场景:
- 实现滑动窗口(只保留最近 N 条消息)
- 删除敏感信息
- 清理无关对话历史
- 优化上下文长度(减少 Token 消耗)
完整案例代码
以下是消息删除功能的完整可运行代码:
from langgraph.graph.message import add_messages
from langchain_core.messages import RemoveMessage, AIMessage, HumanMessage
# ========== 1. 准备消息列表 ==========
messages = [
AIMessage("Hi.", name="Bot", id="1"),
HumanMessage("Hi.", name="Lance", id="2"),
AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3"),
HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4")
]
print("原始消息:")
for msg in messages:
print(f" [id={msg.id}] {msg.name}: {msg.content[:40]}...")
# ========== 2. 创建删除指令 ==========
# 删除前两条消息(id="1" 和 id="2")
delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]]
print(f"\n删除指令: {[f'RemoveMessage(id={d.id})' for d in delete_messages]}")
# ========== 3. 执行删除 ==========
result = add_messages(messages, delete_messages)
print(f"\n删除后的消息 (共 {len(result)} 条):")
for msg in result:
print(f" [id={msg.id}] {msg.name}: {msg.content[:40]}...")
# 输出:
# 删除后的消息 (共 2 条):
# [id=3] Bot: So you said you were researching ocean...
# [id=4] Lance: Yes, I know about whales. But what oth...
# ✅ id="1" 和 id="2" 的消息已被删除实战案例:滑动窗口对话
from langgraph.graph import MessagesState, StateGraph, START, END
from langchain_core.messages import RemoveMessage, HumanMessage, AIMessage
class ConversationState(MessagesState):
max_messages: int = 10 # 最多保留 10 条消息
def chat_node(state: ConversationState):
# 模拟 AI 响应
response = AIMessage(content="I understand your question.", name="Bot")
return {"messages": [response]}
def cleanup_node(state: ConversationState):
"""清理旧消息,只保留最新的 max_messages 条"""
messages = state["messages"]
max_msgs = state.get("max_messages", 10)
if len(messages) > max_msgs:
# 计算需要删除的消息数量
num_to_delete = len(messages) - max_msgs
# 创建删除指令
delete_messages = [RemoveMessage(id=m.id) for m in messages[:num_to_delete]]
return {"messages": delete_messages}
return {}
# 构建图
builder = StateGraph(ConversationState)
builder.add_node("chat", chat_node)
builder.add_node("cleanup", cleanup_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", "cleanup")
builder.add_edge("cleanup", END)
graph = builder.compile()
# 🎨 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
# 使用
initial_state = {
"messages": [HumanMessage(content="Hello", id=str(i)) for i in range(12)],
"max_messages": 10
}
result = graph.invoke(initial_state)
# 结果只保留最新 10 条消息生成的流程图:
完整案例代码
以下是滑动窗口对话的完整可运行代码:
from langgraph.graph import MessagesState, StateGraph, START, END
from langchain_core.messages import RemoveMessage, HumanMessage, AIMessage
from IPython.display import Image, display
# ========== 1. 定义状态 ==========
class ConversationState(MessagesState):
max_messages: int # 最多保留的消息数
# ========== 2. 定义节点 ==========
def chat_node(state: ConversationState):
"""模拟 AI 响应"""
print(f" chat_node: 收到 {len(state['messages'])} 条消息")
response = AIMessage(content="I understand your question.", name="Bot")
return {"messages": [response]}
def cleanup_node(state: ConversationState):
"""清理旧消息,只保留最新的 max_messages 条"""
messages = state["messages"]
max_msgs = state.get("max_messages", 10)
if len(messages) > max_msgs:
num_to_delete = len(messages) - max_msgs
delete_messages = [RemoveMessage(id=m.id) for m in messages[:num_to_delete]]
print(f" cleanup_node: 删除 {num_to_delete} 条旧消息")
return {"messages": delete_messages}
print(f" cleanup_node: 无需清理 ({len(messages)} <= {max_msgs})")
return {}
# ========== 3. 构建图 ==========
builder = StateGraph(ConversationState)
builder.add_node("chat", chat_node)
builder.add_node("cleanup", cleanup_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", "cleanup")
builder.add_edge("cleanup", END)
# ========== 4. 编译 ==========
graph = builder.compile()
# ========== 5. 可视化 ==========
display(Image(graph.get_graph().draw_mermaid_png()))
# ========== 6. 测试滑动窗口 ==========
# 创建 12 条初始消息
initial_state = {
"messages": [HumanMessage(content=f"Message {i}", id=str(i)) for i in range(12)],
"max_messages": 10
}
print(f"初始消息数: {len(initial_state['messages'])}")
print(f"最大保留数: {initial_state['max_messages']}")
print("\n执行图:")
result = graph.invoke(initial_state)
print(f"\n最终消息数: {len(result['messages'])}")
print("保留的消息 ID:", [m.id for m in result['messages']])
# 输出: 最终消息数: 10
# 保留的消息 ID: ['3', '4', '5', '6', '7', '8', '9', '10', '11', '<新消息ID>']
# ✅ 最早的 3 条消息被删除(id=0,1,2),保留最新 10 条🎓 核心知识点总结
LangGraph 特有概念
1. Reducer 机制
| 特性 | 说明 |
|---|---|
| 作用 | 定义状态更新策略 |
| 触发时机 | 多个节点更新同一字段时 |
| 默认行为 | 覆盖(会导致并行冲突) |
| Reducer 行为 | 自定义合并逻辑(如追加、去重) |
2. 内置 Reducer
| Reducer | 用途 | 示例 |
|---|---|---|
operator.add | 拼接列表/字符串 | [1, 2] + [3] = [1, 2, 3] |
add_messages | 追加/修改/删除消息 | 对话历史管理 |
3. MessagesState
# 手动方式
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
# 快捷方式(推荐)
class State(MessagesState):
pass # 自动包含 messages 字段Python 特有知识点
1. Annotated 类型
from typing import Annotated
Annotated[基础类型, 元数据1, 元数据2, ...]
# 在 LangGraph 中
Annotated[list[int], add]
# ^^^^^^^^^ ^^^
# 类型 Reducer作用:
- 提供类型提示(IDE 支持)
- 附加元数据(LangGraph 用于确定 Reducer)
- 不影响运行时行为
2. TypedDict vs Pydantic BaseModel
| 特性 | TypedDict | BaseModel |
|---|---|---|
| 运行时验证 | ❌ | ✅ |
| 类型提示 | ✅ | ✅ |
| 性能 | 更快 | 稍慢 |
| 用途 | 状态定义 | 数据模型、API 输出 |
3. operator 模块
import operator
operator.add(a, b) # a + b
operator.sub(a, b) # a - b
operator.mul(a, b) # a * b
operator.truediv(a, b) # a / b在 LangGraph 中的应用:
operator.add用于列表拼接- 是 Python 内置模块,无需安装
4. 类型提示中的 Union
from typing import Union
# Python 3.10+ 可以使用 | 语法
def reduce_list(left: list | None, right: list | None) -> list:
...
# 等价于(旧语法)
def reduce_list(left: Union[list, None], right: Union[list, None]) -> list:
...💡 最佳实践
1. 何时使用 Reducer?
✅ 必须使用的场景:
- 图中存在并行节点
- 多个节点可能更新同一个状态字段
- 需要追加而非覆盖(如消息历史)
- 需要自定义聚合逻辑(如去重、求和)
❌ 不需要使用的场景:
- 完全串行的图(无并行节点)
- 每个字段只被一个节点更新
- 覆盖行为符合预期
2. Reducer 设计原则
原则 1:明确语义
# ✅ 好的设计 - 语义清晰
class State(TypedDict):
message_history: Annotated[list, add_messages] # 消息历史:追加
calculation_results: Annotated[list, operator.add] # 计算结果:拼接
user_count: Annotated[int, max] # 用户数:取最大值
# ❌ 不好的设计 - 语义不明
class State(TypedDict):
data: Annotated[list, add] # data 是什么?为什么追加?原则 2:处理边缘情况
# ✅ 好的设计 - 处理 None
def safe_reducer(left: list | None, right: list | None) -> list:
if not left:
left = []
if not right:
right = []
return left + right
# ❌ 不好的设计 - 可能崩溃
def unsafe_reducer(left: list, right: list) -> list:
return left + right # 如果传入 None 会报错原则 3:保持幂等性(如果可能)
# ✅ 幂等的 Reducer
def unique_add(left: list, right: list) -> list:
"""多次添加相同元素只保留一个"""
return list(set(left + right))
# 应用:即使重复执行,结果一致
state = ["a", "b"]
state = unique_add(state, ["b", "c"]) # ["a", "b", "c"]
state = unique_add(state, ["b", "c"]) # ["a", "b", "c"] - 相同结果3. 消息管理最佳实践
实践 1:定期清理消息
def should_cleanup(state: MessagesState) -> bool:
"""每 10 条消息清理一次"""
return len(state["messages"]) % 10 == 0
builder.add_conditional_edges(
"chat",
should_cleanup,
{True: "cleanup", False: END}
)实践 2:保留系统消息
from langchain_core.messages import SystemMessage
def cleanup_node(state):
messages = state["messages"]
# 保留系统消息和最新 5 条对话
system_messages = [m for m in messages if isinstance(m, SystemMessage)]
recent_messages = [m for m in messages if not isinstance(m, SystemMessage)][-5:]
# 删除其他消息
to_delete = [m for m in messages if m not in system_messages + recent_messages]
delete_ops = [RemoveMessage(id=m.id) for m in to_delete]
return {"messages": delete_ops}实践 3:实现消息编辑
def edit_message_node(state):
"""允许用户编辑最后一条消息"""
last_message = state["messages"][-1]
# 创建新消息,使用相同的 ID
edited_message = HumanMessage(
content=state["edited_content"],
id=last_message.id, # ⭐ 相同 ID = 覆盖
name=last_message.name
)
return {"messages": [edited_message]}4. 性能优化技巧
技巧 1:避免过大的列表
# ❌ 不好 - 列表无限增长
class State(TypedDict):
all_results: Annotated[list, operator.add]
# ✅ 好 - 限制大小
def limited_add(max_size: int):
def reducer(left: list | None, right: list | None) -> list:
if not left:
left = []
if not right:
right = []
combined = left + right
return combined[-max_size:] # 只保留最新的
return reducer
class State(TypedDict):
recent_results: Annotated[list, limited_add(100)]技巧 2:使用高效的数据结构
# 如果需要频繁查找
def set_reducer(left: set | None, right: set | None) -> set:
"""使用集合实现 O(1) 查找"""
if not left:
left = set()
if not right:
right = set()
return left | right # 集合并集
class State(TypedDict):
unique_ids: Annotated[set, set_reducer]🚀 进阶技巧
1. 条件性 Reducer
根据状态动态选择 Reducer:
def smart_reducer(left: list, right: list, state: dict) -> list:
"""根据状态选择合并策略"""
if state.get("mode") == "append":
return left + right
elif state.get("mode") == "unique":
return list(set(left + right))
elif state.get("mode") == "override":
return right
else:
return left + right2. 组合多个 Reducer
def compose_reducers(*reducers):
"""组合多个 Reducer"""
def combined(left, right):
result = left
for reducer in reducers:
result = reducer(result, right)
return result
return combined
# 使用
validate_and_add = compose_reducers(
validate_list, # 先验证
deduplicate, # 再去重
operator.add # 最后追加
)3. 带日志的 Reducer
def logged_reducer(reducer_func, name: str):
"""为 Reducer 添加日志"""
def wrapper(left, right):
print(f"[{name}] Merging: {left} + {right}")
result = reducer_func(left, right)
print(f"[{name}] Result: {result}")
return result
return wrapper
# 使用
class State(TypedDict):
data: Annotated[list, logged_reducer(operator.add, "DataReducer")]📊 Reducer 模式对比
| 模式 | 实现 | 适用场景 | 示例 |
|---|---|---|---|
| 覆盖 | 默认行为 | 单节点更新 | 配置覆盖 |
| 追加 | operator.add | 收集结果 | 日志收集 |
| 去重追加 | 自定义 | 唯一性约束 | 标签集合 |
| 消息管理 | add_messages | 对话历史 | 聊天机器人 |
| 数值聚合 | max/min/sum | 统计计算 | 性能指标 |
🎯 实际应用案例
案例 1:并行 API 调用结果收集
from typing import Annotated
from operator import add
class State(TypedDict):
query: str
api_results: Annotated[list[dict], add]
def call_api_1(state):
result = {"source": "API1", "data": "..."}
return {"api_results": [result]}
def call_api_2(state):
result = {"source": "API2", "data": "..."}
return {"api_results": [result]}
# 两个 API 并行调用,结果自动合并
# 最终: {"api_results": [{"source": "API1", ...}, {"source": "API2", ...}]}案例 2:聊天机器人状态管理
from langgraph.graph import MessagesState
class ChatState(MessagesState):
user_id: str
session_id: str
context: dict
def chat_node(state: ChatState):
# 生成响应
response = generate_response(state["messages"][-1])
return {"messages": [response]}
def memory_cleanup_node(state: ChatState):
# 只保留最近 20 条消息
messages = state["messages"]
if len(messages) > 20:
to_delete = messages[:-20]
return {"messages": [RemoveMessage(id=m.id) for m in to_delete]}
return {}案例 3:文档分析结果聚合
class AnalysisState(TypedDict):
document: str
insights: Annotated[list[str], operator.add]
metrics: Annotated[dict, merge_dict]
def technical_analysis(state):
insight = "Technical complexity: High"
metrics = {"lines_of_code": 1000}
return {"insights": [insight], "metrics": metrics}
def business_analysis(state):
insight = "Business value: Medium"
metrics = {"estimated_revenue": 50000}
return {"insights": [insight], "metrics": metrics}
# 并行分析,结果自动合并
# insights: ["Technical complexity: High", "Business value: Medium"]
# metrics: {"lines_of_code": 1000, "estimated_revenue": 50000}完整案例代码(可直接运行)
以下是一个完整的 State Reducers 示例,展示并行节点如何通过 Reducer 正确合并状态:
## 完整案例代码(可直接运行)
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END
# 1. 定义状态 - 使用 Annotated 指定 Reducer
class State(TypedDict):
# foo 字段使用 operator.add 作为 Reducer
# 这样并行节点的返回值会被追加而不是覆盖
foo: Annotated[list[int], add]
# 2. 定义节点函数
def node_1(state):
"""第一个节点:追加一个计算结果"""
print("---Node 1---")
current_value = state['foo'][-1] if state['foo'] else 0
return {"foo": [current_value + 1]}
def node_2(state):
"""第二个节点(并行执行):追加另一个计算结果"""
print("---Node 2---")
current_value = state['foo'][-1] if state['foo'] else 0
return {"foo": [current_value + 10]}
def node_3(state):
"""第三个节点(并行执行):追加第三个计算结果"""
print("---Node 3---")
current_value = state['foo'][-1] if state['foo'] else 0
return {"foo": [current_value + 100]}
# 3. 构建图
builder = StateGraph(State)
# 添加节点
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
# 添加边 - node_1 后分支到 node_2 和 node_3(并行执行)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_1", "node_3")
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)
# 4. 编译图
graph = builder.compile()
# 5. 可视化图结构
print("=== 图结构 ===")
display(Image(graph.get_graph().draw_mermaid_png()))
# 6. 执行图
print("\n=== 执行过程 ===")
result = graph.invoke({"foo": [0]})
# 7. 查看结果
print("\n=== 最终结果 ===")
print(f"foo: {result['foo']}")
# 预期输出: foo: [0, 1, 11, 101]
# 解释:
# - 初始: [0]
# - node_1 追加: [0, 1] (0 + 1 = 1)
# - node_2 追加: 11 (1 + 10 = 11)
# - node_3 追加: 101 (1 + 100 = 101)
# - 最终合并: [0, 1, 11, 101]
print("\n=== Reducer 机制演示成功!===")
print("并行节点 node_2 和 node_3 的结果通过 operator.add 自动合并,而不是冲突")🔍 常见问题
Q1: Reducer 和普通函数有什么区别?
Reducer:
- 专门用于状态合并
- 接受两个参数:旧状态和新更新
- 返回合并后的状态
- 由 LangGraph 自动调用
普通函数:
- 可以做任何事情
- 参数和返回值任意
- 需要手动调用
Q2: 为什么并行节点必须使用 Reducer?
因为并行节点在同一步执行,它们的更新是同时发生的。如果没有 Reducer:
node_2 更新: {"foo": 3}
node_3 更新: {"foo": 3}
问题:应该保留哪个?Reducer 提供了明确的合并策略,避免冲突。
Q3: add_messages 如何知道哪些消息应该覆盖?
通过消息的 id 字段:
# 如果新消息的 id 与现有消息相同,就覆盖
new_msg = HumanMessage(content="Updated", id="123")
old_msgs = [HumanMessage(content="Original", id="123")]
result = add_messages(old_msgs, new_msg)
# 结果: [HumanMessage(content="Updated", id="123")]Q4: 可以为不同字段使用不同的 Reducer 吗?
可以!每个字段都可以有自己的 Reducer:
class State(TypedDict):
messages: Annotated[list, add_messages] # 消息:追加
scores: Annotated[list, operator.add] # 分数:拼接
max_score: Annotated[int, max] # 最高分:取最大值
flags: Annotated[set, set_union] # 标志:集合并集Q5: Reducer 会影响性能吗?
会有轻微影响,但通常可以忽略:
operator.add:非常快(Python 内置)add_messages:需要检查 ID,稍慢但仍高效- 自定义 Reducer:取决于实现复杂度
优化建议:
- 避免在 Reducer 中进行复杂计算
- 使用高效的数据结构(如 set 而非 list)
- 限制状态大小(如只保留最新 N 项)
📖 扩展阅读
🎉 总结
State Reducers 是 LangGraph 状态管理的核心机制,它解决了以下关键问题:
- 并行节点冲突 - 通过定义合并策略避免状态冲突
- 灵活的状态更新 - 支持追加、覆盖、去重等多种模式
- 消息管理 - 内置的
add_messages提供追加、修改、删除功能 - 自定义逻辑 - 可以实现任意复杂的状态聚合逻辑
关键要点:
- 使用
Annotated[类型, Reducer函数]定义 Reducer operator.add用于列表/字符串拼接add_messages用于消息历史管理- 自定义 Reducer 处理特殊需求
- 必须处理
None等边缘情况
掌握 State Reducers 是构建复杂 LangGraph 应用的基础,它让状态管理变得优雅而强大!