6.3 Double Texting - 详细解读
一、概述
1.1 本节简介
在真实的生产环境中,用户经常会快速连续发送多条消息,而不是耐心等待 AI 回复完成。这种行为被称为 Double Texting(双击发送/连发消息)。
典型场景:
用户:"帮我添加一个待办事项..."
↓ (AI 正在处理)
用户:"哦,等等,还要添加另一个..."
↓ (第一个还没完成!)
用户:"算了,改成..."
如果系统不能优雅地处理这种情况,会导致:
- 请求冲突
- 数据不一致
- 用户体验差
- 系统崩溃
1.2 为什么 Double Texting 是个问题?
问题的本质:并发访问同一个线程
Thread (线程)
↓
Run 1 正在执行
↓ (修改状态)
Run 2 也想开始
↓ (也要修改状态)
冲突!
可能的后果:
- 数据竞争(Race Condition)
- 状态不一致
- 部分更新丢失
- 执行顺序混乱
1.3 四种处理策略
LangGraph Platform 提供了四种策略来处理 double texting:
处理策略
├── 1. Reject(拒绝)
│ └── 拒绝新请求,返回错误
│
├── 2. Enqueue(排队)
│ └── 将新请求加入队列,顺序执行
│
├── 3. Interrupt(中断)
│ └── 中断当前运行,保存进度,执行新请求
│
└── 4. Rollback(回滚)
└── 删除第一个运行,只执行第二个
1.4 策略对比速览
策略 | 第一个请求 | 第二个请求 | 用户看到 | 适用场景 |
---|---|---|---|---|
Reject | ✅ 完成 | ❌ 被拒绝 | 只有第一个结果 | 严格顺序控制 |
Enqueue | ✅ 完成 | ✅ 完成 | 两个结果都有 | 所有请求都重要 |
Interrupt | ⚠️ 中断但保存 | ✅ 完成 | 第二个结果 + 第一个历史 | 用户纠正/补充 |
Rollback | ❌ 删除 | ✅ 完成 | 只有第二个结果 | 用户完全改变主意 |
1.5 学习目标
通过本节学习,你将掌握:
理解 Double Texting 问题
- 什么是 double texting
- 为什么需要处理
- 实际应用场景
四种策略的使用
- Reject:如何拒绝并发请求
- Enqueue:如何排队处理
- Interrupt:如何中断和继续
- Rollback:如何回滚重来
策略选择
- 不同场景选择不同策略
- 权衡利弊
- 最佳实践
二、环境准备
2.1 安装 SDK
%%capture --no-stderr
%pip install -U langgraph_sdk
2.2 连接到部署
from langgraph_sdk import get_client
url_for_cli_deployment = "http://localhost:8123"
client = get_client(url=url_for_cli_deployment)
前提条件:
- 部署已启动(
docker compose up
) task_maistro
图已部署- API 可访问
三、策略 1:Reject(拒绝)
3.1 概念
Reject 策略:拒绝任何新的并发请求,直到当前运行完成。
用户请求 1
↓
Run 1 开始执行
↓ (执行中...)
用户请求 2
↓
❌ 拒绝!返回 409 Conflict
↓
Run 1 继续执行
↓
Run 1 完成
↓
现在可以接受新请求
3.2 实现代码
import httpx
from langchain_core.messages import HumanMessage
# 创建线程
thread = await client.threads.create()
# 准备两个请求
user_input_1 = "Add a ToDo to follow-up with DI Repairs."
user_input_2 = "Add a ToDo to mount dresser to the wall."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro"
# 创建第一个运行
run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_1)]},
config=config,
)
# 尝试创建第二个运行(会被拒绝)
try:
await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_2)]},
config=config,
multitask_strategy="reject", # 关键参数!
)
except httpx.HTTPStatusError as e:
print("Failed to start concurrent run", e)
输出:
Failed to start concurrent run Client error '409 Conflict' for url 'http://localhost:8123/threads/2b58630e-00fd-4c35-afad-a6b59e9b9104/runs'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/409
3.3 代码详解
关键参数:
multitask_strategy="reject"
这个参数告诉服务器:
- 如果线程上已有运行在执行
- 拒绝这个新请求
- 返回 HTTP 409 Conflict
HTTP 状态码 409:
- 含义:Conflict(冲突)
- 场景:资源状态冲突
- 处理:客户端应该等待并重试
3.4 验证第一个运行完成
from langchain_core.messages import convert_to_messages
# 等待第一个运行完成
await client.runs.join(thread["thread_id"], run["run_id"])
# 获取线程状态
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
m.pretty_print()
输出:
================================ Human Message =================================
Add a ToDo to follow-up with DI Repairs.
================================== Ai Message ==================================
Tool Calls:
UpdateMemory (call_6xqHubCPNufS0bg4tbUxC0FU)
Call ID: call_6xqHubCPNufS0bg4tbUxC0FU
Args:
update_type: todo
================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Follow-up with DI Repairs', 'time_to_complete': 30, 'deadline': None, ...}
================================== Ai Message ==================================
I've added a task to follow-up with DI Repairs to your ToDo list.
观察:
- ✅ 第一个请求成功完成
- ❌ 第二个请求被拒绝,没有在历史中
- 线程只包含第一个请求的内容
3.5 Reject 策略的特点
优点:
- ✅ 简单明了
- ✅ 避免冲突
- ✅ 保持执行顺序
- ✅ 资源使用可控
缺点:
- ❌ 用户的第二个请求丢失
- ❌ 需要客户端处理 409 错误
- ❌ 用户体验可能不好(请求被拒绝)
3.6 使用场景
适合 Reject 的场景:
严格顺序要求
银行转账:必须逐个处理,不能并发
资源受限
昂贵的 API 调用:避免浪费资源
明确告知用户等待
UI 显示"正在处理中,请稍候..." 禁用发送按钮
客户端处理示例:
async def create_run_with_retry(client, thread_id, assistant_id, input, config, max_retries=3):
"""创建运行,如果遇到 409 则重试"""
for attempt in range(max_retries):
try:
return await client.runs.create(
thread_id,
assistant_id,
input=input,
config=config,
multitask_strategy="reject"
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 409 and attempt < max_retries - 1:
print(f"Conflict, retrying... (attempt {attempt + 1})")
await asyncio.sleep(1) # 等待 1 秒
else:
raise
四、策略 2:Enqueue(排队)
4.1 概念
Enqueue 策略:将新请求加入队列,等当前运行完成后按顺序执行。
用户请求 1
↓
Run 1 开始执行
↓
用户请求 2
↓
✅ 接受并加入队列
↓
Run 1 完成
↓
Run 2 自动开始
↓
Run 2 完成
↓
两个请求都处理完成
4.2 实现代码
# 创建新线程
thread = await client.threads.create()
# 准备两个请求
user_input_1 = "Send Erik his t-shirt gift this weekend."
user_input_2 = "Get cash and pay nanny for 2 weeks. Do this by Friday."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro"
# 创建第一个运行
first_run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_1)]},
config=config,
)
# 创建第二个运行(会被排队)
second_run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_2)]},
config=config,
multitask_strategy="enqueue", # 排队策略
)
# 等待第二个运行完成(会自动等第一个完成)
await client.runs.join(thread["thread_id"], second_run["run_id"])
# 获取线程状态
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
m.pretty_print()
输出:
================================ Human Message =================================
Send Erik his t-shirt gift this weekend.
================================== Ai Message ==================================
Tool Calls:
UpdateMemory (call_svTeXPmWGTLY8aQ8EifjwHAa)
Call ID: call_svTeXPmWGTLY8aQ8EifjwHAa
Args:
update_type: todo
================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Send Erik his t-shirt gift', 'deadline': '2024-11-19T23:59:00', ...}
================================== Ai Message ==================================
I've updated your ToDo list to send Erik his t-shirt gift this weekend.
================================ Human Message =================================
Get cash and pay nanny for 2 weeks. Do this by Friday.
================================== Ai Message ==================================
Tool Calls:
UpdateMemory (call_Cq0Tfn6yqccHH8n0DOucz5OQ)
Call ID: call_Cq0Tfn6yqccHH8n0DOucz5OQ
Args:
update_type: todo
================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Get cash and pay nanny for 2 weeks', 'deadline': '2024-11-17T23:59:00', ...}
Document af1fe011-f3c5-4c1c-b98b-181869bc2944 updated:
Plan: Update the deadline for sending Erik his t-shirt gift to this weekend, which is by 2024-11-17.
================================== Ai Message ==================================
I've updated your ToDo list to ensure you get cash and pay the nanny for 2 weeks by Friday.
4.3 代码详解
关键参数:
multitask_strategy="enqueue"
执行流程:
1. first_run 创建并开始执行
↓
2. second_run 创建,自动加入队列
↓ (等待...)
3. first_run 完成
↓
4. second_run 自动从队列中取出并执行
↓
5. second_run 完成
关键点:
client.runs.join(thread_id, second_run_id)
会等待直到 second_run 完成- 这隐式地也等待了 first_run 完成
- 两个请求的消息都出现在线程历史中
4.4 观察结果
线程包含两个完整的对话轮次:
- 第一轮:添加 Erik 礼物待办
- 第二轮:添加付保姆钱待办
注意:第二个请求还更新了第一个待办的截止日期!
Document af1fe011-f3c5-4c1c-b98b-181869bc2944 updated:
Plan: Update the deadline for sending Erik his t-shirt gift to this weekend, which is by 2024-11-17.
这说明:
- 第二个运行可以访问第一个运行的结果
- Store 中的数据在两个运行之间共享
- AI 能够关联两个请求的上下文
4.5 Enqueue 策略的特点
优点:
- ✅ 所有请求都会被处理
- ✅ 保持执行顺序
- ✅ 避免数据冲突
- ✅ 用户无感知(不需要处理错误)
缺点:
- ⚠️ 第二个请求需要等待
- ⚠️ 如果第一个请求很慢,第二个会延迟
- ⚠️ 队列可能会变长
4.6 使用场景
适合 Enqueue 的场景:
批量操作
python用户快速输入: "添加待办:买牛奶" "添加待办:打电话给妈妈" "添加待办:预约牙医" → 全部排队,依次处理
相关请求
python用户: "总结我的待办" "先做哪个?" → 第二个请求依赖第一个的结果
不能丢失任何请求
python客户服务:所有用户消息都必须回复 表单提交:所有数据都必须保存
实际应用示例:
# 聊天机器人示例
async def handle_user_messages(client, thread_id, messages):
"""处理用户的多条消息,使用 enqueue 确保都被处理"""
runs = []
for message in messages:
run = await client.runs.create(
thread_id,
"chatbot",
input={"messages": [HumanMessage(content=message)]},
config=config,
multitask_strategy="enqueue"
)
runs.append(run)
# 等待所有运行完成
for run in runs:
await client.runs.join(thread_id, run["run_id"])
return runs
五、策略 3:Interrupt(中断)
5.1 概念
Interrupt 策略:中断当前运行,但保存已完成的工作,然后执行新请求。
用户请求 1
↓
Run 1 开始执行
↓ (执行了一部分...)
↓ (已完成的工作被保存)
用户请求 2
↓
❗ 中断 Run 1
↓
保存 Run 1 的进度
↓
Run 2 开始执行
↓
Run 2 完成
↓
Run 1 的部分工作和 Run 2 的完整工作都在历史中
5.2 实现代码
import asyncio
# 创建新线程
thread = await client.threads.create()
# 准备两个请求
user_input_1 = "Give me a summary of my ToDos due tomrrow."
user_input_2 = "Never mind, create a ToDo to Order Ham for Thanksgiving by next Friday."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro"
# 创建第一个运行
interrupted_run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_1)]},
config=config,
)
# 等待一秒,让第一个运行执行一部分
await asyncio.sleep(1)
# 创建第二个运行(会中断第一个)
second_run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_2)]},
config=config,
multitask_strategy="interrupt", # 中断策略
)
# 等待第二个运行完成
await client.runs.join(thread["thread_id"], second_run["run_id"])
# 获取线程状态
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
m.pretty_print()
输出:
================================ Human Message =================================
Give me a summary of my ToDos due tomrrow.
================================ Human Message =================================
Never mind, create a ToDo to Order Ham for Thanksgiving by next Friday.
================================== Ai Message ==================================
Tool Calls:
UpdateMemory (call_Rk80tTSJzik2oY44tyUWk8FM)
Call ID: call_Rk80tTSJzik2oY44tyUWk8FM
Args:
update_type: todo
================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Order Ham for Thanksgiving', 'deadline': '2024-11-22T23:59:59', ...}
================================== Ai Message ==================================
I've added the task "Order Ham for Thanksgiving" to your ToDo list.
5.3 代码详解
关键点 1:await asyncio.sleep(1)
await asyncio.sleep(1)
- 等待 1 秒
- 给第一个运行一些执行时间
- 确保第一个运行已经开始(可能部分完成)
如果不等待,第一个运行可能还没开始就被中断了。
关键点 2:multitask_strategy="interrupt"
multitask_strategy="interrupt"
- 中断当前正在执行的运行
- 保存已完成的工作
- 开始执行新运行
5.4 验证中断状态
# 确认第一个运行被中断
status = (await client.runs.get(thread["thread_id"], interrupted_run["run_id"]))["status"]
print(status)
输出:
interrupted
运行状态说明:
interrupted
:运行被中断- 与
success
、error
、pending
不同 - 表示运行没有完成,但工作被保存
5.5 观察结果
线程包含:
- ✅ 第一个请求的输入(Human Message)
- ❌ 第一个请求的输出(被中断,没有完成)
- ✅ 第二个请求的完整对话
解释:
第一个运行:
- 输入消息被保存(已经加入 state)
- AI 响应未生成(被中断)
第二个运行:
- 从当前 state 继续
- 看到了第一个请求的输入
- 生成完整响应
5.6 Interrupt vs Enqueue
关键区别:
对比点 | Enqueue | Interrupt |
---|---|---|
第一个运行 | ✅ 完整执行 | ⚠️ 被中断 |
第一个结果 | ✅ 有完整响应 | ❌ 没有响应 |
第二个运行 | ✅ 在第一个之后 | ✅ 立即执行 |
历史记录 | 两个完整对话 | 一个完整 + 一个部分 |
执行时间 | 第一个 + 第二个 | 约等于第二个 |
5.7 Interrupt 策略的特点
优点:
- ✅ 快速响应新请求
- ✅ 保存第一个请求的输入(上下文保留)
- ✅ 避免浪费时间在过时的请求上
- ✅ 用户能立即看到新的响应
缺点:
- ⚠️ 第一个请求没有完整结果
- ⚠️ 第一个请求的部分工作可能浪费
- ⚠️ 可能导致状态不一致(如果有副作用)
5.8 使用场景
适合 Interrupt 的场景:
用户纠正错误
用户:"预订飞往纽约的机票" ↓ (AI 正在搜索...) 用户:"等等,我是说洛杉矶!" ↓ (中断,立即改为洛杉矶)
用户补充信息
用户:"总结我的待办" ↓ (AI 正在整理...) 用户:"只看明天到期的" ↓ (中断,重新筛选)
用户改变主意
用户:"给我推荐一些餐厅" ↓ (AI 正在搜索...) 用户:"算了,我想知道电影院" ↓ (中断,改为搜索电影院)
优先级变化
用户:"分析这个大文件" ↓ (处理中...) 用户:"快速查一下今天的天气" ↓ (中断大任务,先回答天气)
不适合 Interrupt 的场景:
- ❌ 有副作用的操作(如已发送邮件、已扣款)
- ❌ 不可逆的操作
- ❌ 需要所有请求都完整执行的场景
六、策略 4:Rollback(回滚)
6.1 概念
Rollback 策略:完全删除第一个运行,就像它从未发生过,只执行第二个请求。
用户请求 1
↓
Run 1 开始执行
↓
用户请求 2
↓
❌ 删除 Run 1(包括输入)
↓
Run 2 开始执行
↓
Run 2 完成
↓
线程中只有 Run 2,Run 1 完全消失
6.2 实现代码
# 创建新线程
thread = await client.threads.create()
# 准备两个请求
user_input_1 = "Add a ToDo to call to make appointment at Yoga."
user_input_2 = "Actually, add a ToDo to drop by Yoga in person on Sunday."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro"
# 创建第一个运行
rolled_back_run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_1)]},
config=config,
)
# 创建第二个运行(会回滚第一个)
second_run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input_2)]},
config=config,
multitask_strategy="rollback", # 回滚策略
)
# 等待第二个运行完成
await client.runs.join(thread["thread_id"], second_run["run_id"])
# 获取线程状态
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
m.pretty_print()
输出:
================================ Human Message =================================
Actually, add a ToDo to drop by Yoga in person on Sunday.
================================== Ai Message ==================================
It looks like the task "Drop by Yoga in person" is already on your ToDo list
with a deadline of November 19, 2024. Would you like me to update the deadline
to the upcoming Sunday instead?
6.3 代码详解
关键参数:
multitask_strategy="rollback"
结果观察:
- ❌ 第一个请求完全消失
- ❌ 连输入消息都没有
- ✅ 只有第二个请求的完整对话
6.4 验证回滚
# 确认第一个运行被删除
try:
await client.runs.get(thread["thread_id"], rolled_back_run["run_id"])
except httpx.HTTPStatusError as _:
print("Original run was correctly deleted")
输出:
Original run was correctly deleted
尝试获取第一个运行的信息会抛出 HTTP 404 错误,因为它已经被完全删除了。
6.5 Rollback vs Interrupt
关键区别:
对比点 | Interrupt | Rollback |
---|---|---|
第一个输入 | ✅ 保留 | ❌ 删除 |
第一个状态 | 保存 | 删除 |
线程历史 | 包含第一个输入 | 只有第二个请求 |
Run 记录 | status = interrupted | 完全不存在 |
可恢复性 | 可以查看被中断的运行 | 无法查看第一个运行 |
直观对比:
Interrupt:
Thread History
├── Human: "总结待办" (第一个请求的输入)
└── Human: "创建新待办" (第二个请求)
└── AI: "已创建"
Rollback:
Thread History
└── Human: "创建新待办" (只有第二个请求)
└── AI: "已创建"
6.6 Rollback 策略的特点
优点:
- ✅ 历史最干净
- ✅ 就像第一个请求从未发生
- ✅ 避免混淆(没有未完成的请求)
- ✅ 节省存储空间
缺点:
- ❌ 第一个请求的上下文完全丢失
- ❌ 无法追踪用户改变主意的历史
- ❌ 不可逆(无法恢复第一个请求)
- ❌ 可能丢失有价值的信息
6.7 使用场景
适合 Rollback 的场景:
用户完全改变主意
用户:"预订去巴黎的酒店" ↓ 用户:"不对,改成东京" ↓ (完全不同的请求,旧的没用了)
输入错误
用户:"添加待办:买牛奶...123ABC" ↓ (误触发送,输入未完成) 用户:"添加待办:买牛奶和面包" ↓ (完整正确的输入)
测试/实验
用户:"试试这个功能" ↓ (只是测试) 用户:"正式开始工作" ↓ (真正的任务)
用户想要"重新开始"
聊天机器人: 用户:"你好" 用户:"不对,直接说正事" ↓ (用户不想要寒暄)
不适合 Rollback 的场景:
- ❌ 第一个请求的上下文对第二个请求有用
- ❌ 需要审计追踪(audit trail)
- ❌ 第一个请求可能有副作用
七、策略对比和选择
7.1 详细对比表
特性 | Reject | Enqueue | Interrupt | Rollback |
---|---|---|---|---|
第一个运行 | ✅ 完成 | ✅ 完成 | ⚠️ 中断 | ❌ 删除 |
第二个运行 | ❌ 拒绝 | ✅ 排队后完成 | ✅ 立即完成 | ✅ 立即完成 |
第一个输入 | ✅ 保留 | ✅ 保留 | ✅ 保留 | ❌ 删除 |
第一个输出 | ✅ 完整 | ✅ 完整 | ❌ 无 | ❌ 无 |
响应时间 | 快(只一个) | 慢(两个顺序) | 中(第二个立即) | 中(第二个立即) |
用户体验 | 需处理错误 | 无感知等待 | 快速响应 | 最干净 |
数据完整性 | ✅ 最高 | ✅ 高 | ⚠️ 中 | ⚠️ 低 |
副作用处理 | ✅ 安全 | ✅ 安全 | ⚠️ 需注意 | ❌ 不安全 |
可追溯性 | ✅ 完整 | ✅ 完整 | ✅ 中(有interrupted状态) | ❌ 无 |
7.2 决策树
用户发送第二条消息
↓
第一条消息还在处理中?
↓
YES
↓
问:第一条消息重要吗?
↓
YES → 问:能等吗?
↓ ↓
YES NO
↓ ↓
Enqueue Interrupt
(排队) (中断)
NO → 问:需要第一条的上下文吗?
↓ ↓
YES NO
↓ ↓
Interrupt Rollback
(中断) (回滚)
问:系统资源紧张吗?
↓
YES
↓
Reject
(拒绝)
7.3 场景选择指南
场景 1:聊天机器人
情况:用户在聊天
用户:"你好"
用户:"帮我查天气"
推荐:Enqueue
- 原因:两条消息都是独立的,都应该回复
- 用户期望:收到两条回复
场景 2:搜索/查询
情况:用户在搜索
用户:"搜索巴黎酒店"
用户:"不对,搜索东京酒店"
推荐:Rollback
- 原因:用户改变了搜索目标
- 第一个搜索结果无用
场景 3:表单填写
情况:用户在填写表单
用户:"我的名字是..."
用户:"等等,我再想想"
推荐:Interrupt
- 原因:保留部分填写的内容
- 用户可能继续填写
场景 4:交易操作
情况:用户在转账
用户:"转账1000元到账户A"
用户:"不对,转到账户B"
推荐:Reject
- 原因:金融交易不能有任何不确定性
- 必须等第一个完全处理完
- 或者UI层面禁止快速连发
场景 5:文件处理
情况:用户上传文件处理
用户:"分析这个大文件"
用户:"等等,先看这个小文件"
推荐:Interrupt
- 原因:大文件处理耗时,小文件优先级更高
- 节省资源
7.4 Python 实现辅助函数
async def create_run_with_strategy(
client,
thread_id,
assistant_id,
input,
config,
scenario="chat"
):
"""根据场景自动选择策略"""
strategy_map = {
"chat": "enqueue", # 聊天:全部处理
"search": "rollback", # 搜索:最新优先
"form": "interrupt", # 表单:保留进度
"transaction": "reject", # 交易:严格顺序
"file": "interrupt" # 文件:可中断
}
strategy = strategy_map.get(scenario, "enqueue")
try:
return await client.runs.create(
thread_id,
assistant_id,
input=input,
config=config,
multitask_strategy=strategy
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
# Reject 策略返回 409,等待后重试
await asyncio.sleep(1)
return await create_run_with_strategy(
client, thread_id, assistant_id, input, config, scenario
)
else:
raise
八、高级主题
8.1 动态策略选择
根据运行时条件动态选择策略:
async def adaptive_create_run(client, thread_id, assistant_id, input, config):
"""自适应策略选择"""
# 检查当前运行状态
runs = await client.runs.list(thread_id)
active_runs = [r for r in runs if r["status"] in ["pending", "running"]]
if not active_runs:
# 没有活跃运行,直接创建
strategy = None
elif len(active_runs) == 1:
# 一个活跃运行
run = active_runs[0]
# 检查运行时长
created_at = datetime.fromisoformat(run["created_at"])
elapsed = datetime.now(timezone.utc) - created_at
if elapsed.seconds < 5:
# 刚开始不久,可以回滚
strategy = "rollback"
elif elapsed.seconds < 30:
# 执行中,中断
strategy = "interrupt"
else:
# 执行很久了,等待或拒绝
strategy = "enqueue"
else:
# 多个活跃运行,拒绝
strategy = "reject"
return await client.runs.create(
thread_id,
assistant_id,
input=input,
config=config,
multitask_strategy=strategy
)
8.2 用户反馈机制
让用户选择如何处理:
async def create_run_with_user_choice(client, thread_id, assistant_id, input, config):
"""让用户选择策略"""
# 检查是否有活跃运行
runs = await client.runs.list(thread_id)
active_runs = [r for r in runs if r["status"] in ["pending", "running"]]
if active_runs:
# 询问用户
print("检测到正在处理的请求,你想要:")
print("1. 等待当前请求完成后处理 (Enqueue)")
print("2. 取消当前请求,立即处理新请求 (Interrupt)")
print("3. 取消当前请求并删除 (Rollback)")
print("4. 取消新请求 (Reject)")
choice = input("请选择 (1-4): ")
strategy_map = {
"1": "enqueue",
"2": "interrupt",
"3": "rollback",
"4": "reject"
}
strategy = strategy_map.get(choice, "enqueue")
else:
strategy = None
if strategy == "reject":
print("新请求已取消")
return None
return await client.runs.create(
thread_id,
assistant_id,
input=input,
config=config,
multitask_strategy=strategy
)
8.3 监控和日志
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def create_run_with_logging(
client, thread_id, assistant_id, input, config, strategy
):
"""带详细日志的运行创建"""
logger.info(f"Creating run with strategy: {strategy}")
logger.info(f"Thread: {thread_id}")
logger.info(f"Input: {input}")
try:
run = await client.runs.create(
thread_id,
assistant_id,
input=input,
config=config,
multitask_strategy=strategy
)
logger.info(f"Run created: {run['run_id']}")
logger.info(f"Status: {run['status']}")
return run
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
logger.warning(f"Conflict (409): Run rejected by {strategy} strategy")
else:
logger.error(f"HTTP error: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
九、最佳实践
9.1 选择策略的原则
1. 默认使用 Enqueue
# 对于大多数场景,enqueue 是最安全的选择
strategy = "enqueue"
2. 明确用户意图时使用 Rollback
# 用户明确说"不对"、"算了"、"改成"
if "不对" in user_input or "算了" in user_input:
strategy = "rollback"
3. 资源受限时使用 Reject
# API 调用次数有限、费用昂贵
if expensive_operation:
strategy = "reject"
4. 用户体验优先时使用 Interrupt
# 响应速度重要,旧请求可以中断
if user_priority == "speed":
strategy = "interrupt"
9.2 错误处理模板
async def robust_create_run(
client, thread_id, assistant_id, input, config, strategy="enqueue", max_retries=3
):
"""健壮的运行创建"""
for attempt in range(max_retries):
try:
return await client.runs.create(
thread_id,
assistant_id,
input=input,
config=config,
multitask_strategy=strategy
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
# Conflict - 根据策略处理
if strategy == "reject":
if attempt < max_retries - 1:
logger.info(f"Retry attempt {attempt + 1}/{max_retries}")
await asyncio.sleep(2 ** attempt) # 指数退避
continue
else:
logger.error("Max retries reached")
raise
else:
# 其他策略不应该返回 409
logger.error(f"Unexpected 409 with strategy {strategy}")
raise
else:
logger.error(f"HTTP {e.response.status_code}: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
9.3 测试不同策略
async def test_all_strategies(client, assistant_id, config):
"""测试所有策略"""
strategies = ["reject", "enqueue", "interrupt", "rollback"]
for strategy in strategies:
print(f"\n=== Testing {strategy} ===")
# 创建线程
thread = await client.threads.create()
# 第一个请求
run1 = await client.runs.create(
thread["thread_id"],
assistant_id,
input={"messages": [HumanMessage(content="First request")]},
config=config
)
# 等待一点时间
await asyncio.sleep(0.5)
# 第二个请求
try:
run2 = await client.runs.create(
thread["thread_id"],
assistant_id,
input={"messages": [HumanMessage(content="Second request")]},
config=config,
multitask_strategy=strategy
)
print(f"Run 2 created: {run2['run_id']}")
except httpx.HTTPStatusError as e:
print(f"Run 2 rejected: {e.response.status_code}")
# 等待完成
await asyncio.sleep(2)
# 检查结果
state = await client.threads.get_state(thread["thread_id"])
messages = state["values"]["messages"]
print(f"Total messages: {len(messages)}")
# 检查第一个运行状态
run1_status = (await client.runs.get(thread["thread_id"], run1["run_id"]))["status"]
print(f"Run 1 status: {run1_status}")
十、常见问题和解决方案
10.1 问题:Enqueue 队列过长
现象:用户发送很多消息,队列积压
解决方案:
# 检查队列长度
async def get_queue_length(client, thread_id):
runs = await client.runs.list(thread_id)
pending_runs = [r for r in runs if r["status"] in ["pending"]]
return len(pending_runs)
# 动态调整策略
async def smart_create_run(client, thread_id, assistant_id, input, config):
queue_length = await get_queue_length(client, thread_id)
if queue_length > 5:
# 队列过长,改用 rollback
strategy = "rollback"
else:
strategy = "enqueue"
return await client.runs.create(
thread_id, assistant_id, input=input, config=config,
multitask_strategy=strategy
)
10.2 问题:Interrupt 导致副作用丢失
现象:运行被中断,但已经发送了邮件
解决方案:
# 方案 1:使用 Enqueue 替代 Interrupt
# 对于有副作用的操作,不要使用 interrupt
# 方案 2:幂等性设计
# 确保操作可以安全重试
10.3 问题:Rollback 丢失重要上下文
现象:用户的第一条消息包含重要信息
解决方案:
# 方案 1:在 UI 层面确认
# "确定要取消之前的请求吗?"
# 方案 2:使用 Interrupt 替代 Rollback
# 保留第一个请求的输入
# 方案 3:合并两个请求
async def merge_requests(client, thread_id, assistant_id, request1, request2, config):
"""合并两个请求"""
merged_input = {
"messages": [
HumanMessage(content=f"Context: {request1}\n\nCurrent request: {request2}")
]
}
return await client.runs.create(
thread_id,
assistant_id,
input=merged_input,
config=config,
multitask_strategy="rollback"
)
十一、总结
11.1 核心要点
Double Texting 是生产环境的常见问题
- 用户不会等待
- 必须优雅处理并发请求
四种策略各有适用场景
- Reject:严格顺序控制
- Enqueue:所有请求都重要
- Interrupt:快速响应优先
- Rollback:最新请求优先
选择策略的关键因素
- 用户意图
- 请求重要性
- 响应速度要求
- 副作用考虑
没有万能策略
- 根据场景选择
- 可以动态调整
- 需要权衡利弊
11.2 快速参考
# Reject - 拒绝新请求
await client.runs.create(..., multitask_strategy="reject")
# 返回 409 Conflict
# Enqueue - 排队处理
await client.runs.create(..., multitask_strategy="enqueue")
# 两个请求都完成
# Interrupt - 中断当前
await client.runs.create(..., multitask_strategy="interrupt")
# 保存第一个输入,执行第二个
# Rollback - 回滚删除
await client.runs.create(..., multitask_strategy="rollback")
# 删除第一个,只执行第二个
11.3 决策速查表
你的需求 | 推荐策略 |
---|---|
所有请求都必须处理 | Enqueue |
新请求优先级更高 | Interrupt 或 Rollback |
第一个请求不能中断 | Reject 或 Enqueue |
需要快速响应 | Interrupt 或 Rollback |
有副作用操作 | Reject 或 Enqueue |
用户明确取消第一个 | Rollback |
保留所有历史 | Enqueue 或 Interrupt |
历史最干净 | Rollback |
11.4 下节预告
在 6.4-assistant 中,我们将学习:
- 什么是 Assistants(助手)
- 如何创建和管理 assistants
- 配置不同的 assistants
- Assistants 的版本控制
- 实际应用场景
文档版本:1.0 最后更新:2024-11-05 作者:AI Assistant 基于:LangChain Academy Module-6 Lesson 6.3
恭喜你掌握了 Double Texting 的所有处理策略!现在你可以构建能优雅处理并发请求的生产级应用了。🎉