Skip to content

6.3 Double Texting - 详细解读

一、概述

1.1 本节简介

在真实的生产环境中,用户经常会快速连续发送多条消息,而不是耐心等待 AI 回复完成。这种行为被称为 Double Texting(双击发送/连发消息)。

典型场景

用户:"帮我添加一个待办事项..."
      ↓ (AI 正在处理)
用户:"哦,等等,还要添加另一个..."
      ↓ (第一个还没完成!)
用户:"算了,改成..."

如果系统不能优雅地处理这种情况,会导致:

  • 请求冲突
  • 数据不一致
  • 用户体验差
  • 系统崩溃

1.2 为什么 Double Texting 是个问题?

问题的本质并发访问同一个线程

Thread (线程)

Run 1 正在执行
    ↓ (修改状态)
Run 2 也想开始
    ↓ (也要修改状态)
冲突!

可能的后果

  1. 数据竞争(Race Condition)
  2. 状态不一致
  3. 部分更新丢失
  4. 执行顺序混乱

1.3 四种处理策略

LangGraph Platform 提供了四种策略来处理 double texting:

处理策略
├── 1. Reject(拒绝)
│   └── 拒绝新请求,返回错误

├── 2. Enqueue(排队)
│   └── 将新请求加入队列,顺序执行

├── 3. Interrupt(中断)
│   └── 中断当前运行,保存进度,执行新请求

└── 4. Rollback(回滚)
    └── 删除第一个运行,只执行第二个

1.4 策略对比速览

策略第一个请求第二个请求用户看到适用场景
Reject✅ 完成❌ 被拒绝只有第一个结果严格顺序控制
Enqueue✅ 完成✅ 完成两个结果都有所有请求都重要
Interrupt⚠️ 中断但保存✅ 完成第二个结果 + 第一个历史用户纠正/补充
Rollback❌ 删除✅ 完成只有第二个结果用户完全改变主意

1.5 学习目标

通过本节学习,你将掌握:

  1. 理解 Double Texting 问题

    • 什么是 double texting
    • 为什么需要处理
    • 实际应用场景
  2. 四种策略的使用

    • Reject:如何拒绝并发请求
    • Enqueue:如何排队处理
    • Interrupt:如何中断和继续
    • Rollback:如何回滚重来
  3. 策略选择

    • 不同场景选择不同策略
    • 权衡利弊
    • 最佳实践

二、环境准备

2.1 安装 SDK

python
%%capture --no-stderr
%pip install -U langgraph_sdk

2.2 连接到部署

python
from langgraph_sdk import get_client

url_for_cli_deployment = "http://localhost:8123"
client = get_client(url=url_for_cli_deployment)

前提条件

  • 部署已启动(docker compose up
  • task_maistro 图已部署
  • API 可访问

三、策略 1:Reject(拒绝)

3.1 概念

Reject 策略:拒绝任何新的并发请求,直到当前运行完成。

用户请求 1

Run 1 开始执行
    ↓ (执行中...)
用户请求 2

❌ 拒绝!返回 409 Conflict

Run 1 继续执行

Run 1 完成

现在可以接受新请求

3.2 实现代码

python
import httpx
from langchain_core.messages import HumanMessage

# 创建线程
thread = await client.threads.create()

# 准备两个请求
user_input_1 = "Add a ToDo to follow-up with DI Repairs."
user_input_2 = "Add a ToDo to mount dresser to the wall."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro"

# 创建第一个运行
run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_1)]},
    config=config,
)

# 尝试创建第二个运行(会被拒绝)
try:
    await client.runs.create(
        thread["thread_id"],
        graph_name,
        input={"messages": [HumanMessage(content=user_input_2)]},
        config=config,
        multitask_strategy="reject",  # 关键参数!
    )
except httpx.HTTPStatusError as e:
    print("Failed to start concurrent run", e)

输出

Failed to start concurrent run Client error '409 Conflict' for url 'http://localhost:8123/threads/2b58630e-00fd-4c35-afad-a6b59e9b9104/runs'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/409

3.3 代码详解

关键参数

python
multitask_strategy="reject"

这个参数告诉服务器:

  • 如果线程上已有运行在执行
  • 拒绝这个新请求
  • 返回 HTTP 409 Conflict

HTTP 状态码 409

  • 含义:Conflict(冲突)
  • 场景:资源状态冲突
  • 处理:客户端应该等待并重试

3.4 验证第一个运行完成

python
from langchain_core.messages import convert_to_messages

# 等待第一个运行完成
await client.runs.join(thread["thread_id"], run["run_id"])

# 获取线程状态
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
    m.pretty_print()

输出

================================ Human Message =================================
Add a ToDo to follow-up with DI Repairs.

================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_6xqHubCPNufS0bg4tbUxC0FU)
 Call ID: call_6xqHubCPNufS0bg4tbUxC0FU
  Args:
    update_type: todo

================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Follow-up with DI Repairs', 'time_to_complete': 30, 'deadline': None, ...}

================================== Ai Message ==================================
I've added a task to follow-up with DI Repairs to your ToDo list.

观察

  • ✅ 第一个请求成功完成
  • ❌ 第二个请求被拒绝,没有在历史中
  • 线程只包含第一个请求的内容

3.5 Reject 策略的特点

优点

  • ✅ 简单明了
  • ✅ 避免冲突
  • ✅ 保持执行顺序
  • ✅ 资源使用可控

缺点

  • ❌ 用户的第二个请求丢失
  • ❌ 需要客户端处理 409 错误
  • ❌ 用户体验可能不好(请求被拒绝)

3.6 使用场景

适合 Reject 的场景

  1. 严格顺序要求

    银行转账:必须逐个处理,不能并发
  2. 资源受限

    昂贵的 API 调用:避免浪费资源
  3. 明确告知用户等待

    UI 显示"正在处理中,请稍候..."
    禁用发送按钮

客户端处理示例

python
async def create_run_with_retry(client, thread_id, assistant_id, input, config, max_retries=3):
    """创建运行,如果遇到 409 则重试"""
    for attempt in range(max_retries):
        try:
            return await client.runs.create(
                thread_id,
                assistant_id,
                input=input,
                config=config,
                multitask_strategy="reject"
            )
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 409 and attempt < max_retries - 1:
                print(f"Conflict, retrying... (attempt {attempt + 1})")
                await asyncio.sleep(1)  # 等待 1 秒
            else:
                raise

四、策略 2:Enqueue(排队)

4.1 概念

Enqueue 策略:将新请求加入队列,等当前运行完成后按顺序执行。

用户请求 1

Run 1 开始执行

用户请求 2

✅ 接受并加入队列

Run 1 完成

Run 2 自动开始

Run 2 完成

两个请求都处理完成

4.2 实现代码

python
# 创建新线程
thread = await client.threads.create()

# 准备两个请求
user_input_1 = "Send Erik his t-shirt gift this weekend."
user_input_2 = "Get cash and pay nanny for 2 weeks. Do this by Friday."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro"

# 创建第一个运行
first_run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_1)]},
    config=config,
)

# 创建第二个运行(会被排队)
second_run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_2)]},
    config=config,
    multitask_strategy="enqueue",  # 排队策略
)

# 等待第二个运行完成(会自动等第一个完成)
await client.runs.join(thread["thread_id"], second_run["run_id"])

# 获取线程状态
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
    m.pretty_print()

输出

================================ Human Message =================================
Send Erik his t-shirt gift this weekend.

================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_svTeXPmWGTLY8aQ8EifjwHAa)
 Call ID: call_svTeXPmWGTLY8aQ8EifjwHAa
  Args:
    update_type: todo

================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Send Erik his t-shirt gift', 'deadline': '2024-11-19T23:59:00', ...}

================================== Ai Message ==================================
I've updated your ToDo list to send Erik his t-shirt gift this weekend.

================================ Human Message =================================
Get cash and pay nanny for 2 weeks. Do this by Friday.

================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_Cq0Tfn6yqccHH8n0DOucz5OQ)
 Call ID: call_Cq0Tfn6yqccHH8n0DOucz5OQ
  Args:
    update_type: todo

================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Get cash and pay nanny for 2 weeks', 'deadline': '2024-11-17T23:59:00', ...}

Document af1fe011-f3c5-4c1c-b98b-181869bc2944 updated:
Plan: Update the deadline for sending Erik his t-shirt gift to this weekend, which is by 2024-11-17.

================================== Ai Message ==================================
I've updated your ToDo list to ensure you get cash and pay the nanny for 2 weeks by Friday.

4.3 代码详解

关键参数

python
multitask_strategy="enqueue"

执行流程

1. first_run 创建并开始执行

2. second_run 创建,自动加入队列
   ↓ (等待...)
3. first_run 完成

4. second_run 自动从队列中取出并执行

5. second_run 完成

关键点

  • client.runs.join(thread_id, second_run_id) 会等待直到 second_run 完成
  • 这隐式地也等待了 first_run 完成
  • 两个请求的消息都出现在线程历史中

4.4 观察结果

线程包含两个完整的对话轮次

  1. 第一轮:添加 Erik 礼物待办
  2. 第二轮:添加付保姆钱待办

注意:第二个请求还更新了第一个待办的截止日期!

Document af1fe011-f3c5-4c1c-b98b-181869bc2944 updated:
Plan: Update the deadline for sending Erik his t-shirt gift to this weekend, which is by 2024-11-17.

这说明:

  • 第二个运行可以访问第一个运行的结果
  • Store 中的数据在两个运行之间共享
  • AI 能够关联两个请求的上下文

4.5 Enqueue 策略的特点

优点

  • ✅ 所有请求都会被处理
  • ✅ 保持执行顺序
  • ✅ 避免数据冲突
  • ✅ 用户无感知(不需要处理错误)

缺点

  • ⚠️ 第二个请求需要等待
  • ⚠️ 如果第一个请求很慢,第二个会延迟
  • ⚠️ 队列可能会变长

4.6 使用场景

适合 Enqueue 的场景

  1. 批量操作

    python
    用户快速输入:
    "添加待办:买牛奶"
    "添加待办:打电话给妈妈"
    "添加待办:预约牙医"
    
    → 全部排队,依次处理
  2. 相关请求

    python
    用户:
    "总结我的待办"
    "先做哪个?"
    
    → 第二个请求依赖第一个的结果
  3. 不能丢失任何请求

    python
    客户服务:所有用户消息都必须回复
    表单提交:所有数据都必须保存

实际应用示例

python
# 聊天机器人示例
async def handle_user_messages(client, thread_id, messages):
    """处理用户的多条消息,使用 enqueue 确保都被处理"""
    runs = []
    for message in messages:
        run = await client.runs.create(
            thread_id,
            "chatbot",
            input={"messages": [HumanMessage(content=message)]},
            config=config,
            multitask_strategy="enqueue"
        )
        runs.append(run)

    # 等待所有运行完成
    for run in runs:
        await client.runs.join(thread_id, run["run_id"])

    return runs

五、策略 3:Interrupt(中断)

5.1 概念

Interrupt 策略:中断当前运行,但保存已完成的工作,然后执行新请求。

用户请求 1

Run 1 开始执行
    ↓ (执行了一部分...)
    ↓ (已完成的工作被保存)
用户请求 2

❗ 中断 Run 1

保存 Run 1 的进度

Run 2 开始执行

Run 2 完成

Run 1 的部分工作和 Run 2 的完整工作都在历史中

5.2 实现代码

python
import asyncio

# 创建新线程
thread = await client.threads.create()

# 准备两个请求
user_input_1 = "Give me a summary of my ToDos due tomrrow."
user_input_2 = "Never mind, create a ToDo to Order Ham for Thanksgiving by next Friday."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro"

# 创建第一个运行
interrupted_run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_1)]},
    config=config,
)

# 等待一秒,让第一个运行执行一部分
await asyncio.sleep(1)

# 创建第二个运行(会中断第一个)
second_run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_2)]},
    config=config,
    multitask_strategy="interrupt",  # 中断策略
)

# 等待第二个运行完成
await client.runs.join(thread["thread_id"], second_run["run_id"])

# 获取线程状态
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
    m.pretty_print()

输出

================================ Human Message =================================
Give me a summary of my ToDos due tomrrow.

================================ Human Message =================================
Never mind, create a ToDo to Order Ham for Thanksgiving by next Friday.

================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_Rk80tTSJzik2oY44tyUWk8FM)
 Call ID: call_Rk80tTSJzik2oY44tyUWk8FM
  Args:
    update_type: todo

================================= Tool Message =================================
New ToDo created:
Content: {'task': 'Order Ham for Thanksgiving', 'deadline': '2024-11-22T23:59:59', ...}

================================== Ai Message ==================================
I've added the task "Order Ham for Thanksgiving" to your ToDo list.

5.3 代码详解

关键点 1await asyncio.sleep(1)

python
await asyncio.sleep(1)
  • 等待 1 秒
  • 给第一个运行一些执行时间
  • 确保第一个运行已经开始(可能部分完成)

如果不等待,第一个运行可能还没开始就被中断了。

关键点 2multitask_strategy="interrupt"

python
multitask_strategy="interrupt"
  • 中断当前正在执行的运行
  • 保存已完成的工作
  • 开始执行新运行

5.4 验证中断状态

python
# 确认第一个运行被中断
status = (await client.runs.get(thread["thread_id"], interrupted_run["run_id"]))["status"]
print(status)

输出

interrupted

运行状态说明

  • interrupted:运行被中断
  • successerrorpending 不同
  • 表示运行没有完成,但工作被保存

5.5 观察结果

线程包含

  1. ✅ 第一个请求的输入(Human Message)
  2. ❌ 第一个请求的输出(被中断,没有完成)
  3. ✅ 第二个请求的完整对话

解释

第一个运行:
- 输入消息被保存(已经加入 state)
- AI 响应未生成(被中断)

第二个运行:
- 从当前 state 继续
- 看到了第一个请求的输入
- 生成完整响应

5.6 Interrupt vs Enqueue

关键区别

对比点EnqueueInterrupt
第一个运行✅ 完整执行⚠️ 被中断
第一个结果✅ 有完整响应❌ 没有响应
第二个运行✅ 在第一个之后✅ 立即执行
历史记录两个完整对话一个完整 + 一个部分
执行时间第一个 + 第二个约等于第二个

5.7 Interrupt 策略的特点

优点

  • ✅ 快速响应新请求
  • ✅ 保存第一个请求的输入(上下文保留)
  • ✅ 避免浪费时间在过时的请求上
  • ✅ 用户能立即看到新的响应

缺点

  • ⚠️ 第一个请求没有完整结果
  • ⚠️ 第一个请求的部分工作可能浪费
  • ⚠️ 可能导致状态不一致(如果有副作用)

5.8 使用场景

适合 Interrupt 的场景

  1. 用户纠正错误

    用户:"预订飞往纽约的机票"
       ↓ (AI 正在搜索...)
    用户:"等等,我是说洛杉矶!"
       ↓ (中断,立即改为洛杉矶)
  2. 用户补充信息

    用户:"总结我的待办"
       ↓ (AI 正在整理...)
    用户:"只看明天到期的"
       ↓ (中断,重新筛选)
  3. 用户改变主意

    用户:"给我推荐一些餐厅"
       ↓ (AI 正在搜索...)
    用户:"算了,我想知道电影院"
       ↓ (中断,改为搜索电影院)
  4. 优先级变化

    用户:"分析这个大文件"
       ↓ (处理中...)
    用户:"快速查一下今天的天气"
       ↓ (中断大任务,先回答天气)

不适合 Interrupt 的场景

  • ❌ 有副作用的操作(如已发送邮件、已扣款)
  • ❌ 不可逆的操作
  • ❌ 需要所有请求都完整执行的场景

六、策略 4:Rollback(回滚)

6.1 概念

Rollback 策略:完全删除第一个运行,就像它从未发生过,只执行第二个请求。

用户请求 1

Run 1 开始执行

用户请求 2

❌ 删除 Run 1(包括输入)

Run 2 开始执行

Run 2 完成

线程中只有 Run 2,Run 1 完全消失

6.2 实现代码

python
# 创建新线程
thread = await client.threads.create()

# 准备两个请求
user_input_1 = "Add a ToDo to call to make appointment at Yoga."
user_input_2 = "Actually, add a ToDo to drop by Yoga in person on Sunday."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro"

# 创建第一个运行
rolled_back_run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_1)]},
    config=config,
)

# 创建第二个运行(会回滚第一个)
second_run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_2)]},
    config=config,
    multitask_strategy="rollback",  # 回滚策略
)

# 等待第二个运行完成
await client.runs.join(thread["thread_id"], second_run["run_id"])

# 获取线程状态
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
    m.pretty_print()

输出

================================ Human Message =================================
Actually, add a ToDo to drop by Yoga in person on Sunday.

================================== Ai Message ==================================
It looks like the task "Drop by Yoga in person" is already on your ToDo list
with a deadline of November 19, 2024. Would you like me to update the deadline
to the upcoming Sunday instead?

6.3 代码详解

关键参数

python
multitask_strategy="rollback"

结果观察

  • ❌ 第一个请求完全消失
  • ❌ 连输入消息都没有
  • ✅ 只有第二个请求的完整对话

6.4 验证回滚

python
# 确认第一个运行被删除
try:
    await client.runs.get(thread["thread_id"], rolled_back_run["run_id"])
except httpx.HTTPStatusError as _:
    print("Original run was correctly deleted")

输出

Original run was correctly deleted

尝试获取第一个运行的信息会抛出 HTTP 404 错误,因为它已经被完全删除了。

6.5 Rollback vs Interrupt

关键区别

对比点InterruptRollback
第一个输入✅ 保留❌ 删除
第一个状态保存删除
线程历史包含第一个输入只有第二个请求
Run 记录status = interrupted完全不存在
可恢复性可以查看被中断的运行无法查看第一个运行

直观对比

Interrupt:
Thread History
├── Human: "总结待办"  (第一个请求的输入)
└── Human: "创建新待办" (第二个请求)
    └── AI: "已创建"

Rollback:
Thread History
└── Human: "创建新待办" (只有第二个请求)
    └── AI: "已创建"

6.6 Rollback 策略的特点

优点

  • ✅ 历史最干净
  • ✅ 就像第一个请求从未发生
  • ✅ 避免混淆(没有未完成的请求)
  • ✅ 节省存储空间

缺点

  • ❌ 第一个请求的上下文完全丢失
  • ❌ 无法追踪用户改变主意的历史
  • ❌ 不可逆(无法恢复第一个请求)
  • ❌ 可能丢失有价值的信息

6.7 使用场景

适合 Rollback 的场景

  1. 用户完全改变主意

    用户:"预订去巴黎的酒店"
    
    用户:"不对,改成东京"
       ↓ (完全不同的请求,旧的没用了)
  2. 输入错误

    用户:"添加待办:买牛奶...123ABC"
       ↓ (误触发送,输入未完成)
    用户:"添加待办:买牛奶和面包"
       ↓ (完整正确的输入)
  3. 测试/实验

    用户:"试试这个功能"
       ↓ (只是测试)
    用户:"正式开始工作"
       ↓ (真正的任务)
  4. 用户想要"重新开始"

    聊天机器人:
    用户:"你好"
    用户:"不对,直接说正事"
       ↓ (用户不想要寒暄)

不适合 Rollback 的场景

  • ❌ 第一个请求的上下文对第二个请求有用
  • ❌ 需要审计追踪(audit trail)
  • ❌ 第一个请求可能有副作用

七、策略对比和选择

7.1 详细对比表

特性RejectEnqueueInterruptRollback
第一个运行✅ 完成✅ 完成⚠️ 中断❌ 删除
第二个运行❌ 拒绝✅ 排队后完成✅ 立即完成✅ 立即完成
第一个输入✅ 保留✅ 保留✅ 保留❌ 删除
第一个输出✅ 完整✅ 完整❌ 无❌ 无
响应时间快(只一个)慢(两个顺序)中(第二个立即)中(第二个立即)
用户体验需处理错误无感知等待快速响应最干净
数据完整性✅ 最高✅ 高⚠️ 中⚠️ 低
副作用处理✅ 安全✅ 安全⚠️ 需注意❌ 不安全
可追溯性✅ 完整✅ 完整✅ 中(有interrupted状态)❌ 无

7.2 决策树

用户发送第二条消息

第一条消息还在处理中?

   YES

问:第一条消息重要吗?

   YES → 问:能等吗?
          ↓              ↓
         YES            NO
          ↓              ↓
       Enqueue      Interrupt
          (排队)      (中断)

   NO → 问:需要第一条的上下文吗?
         ↓              ↓
        YES            NO
         ↓              ↓
      Interrupt    Rollback
       (中断)        (回滚)

问:系统资源紧张吗?

   YES

  Reject
  (拒绝)

7.3 场景选择指南

场景 1:聊天机器人

情况:用户在聊天

用户:"你好"
用户:"帮我查天气"

推荐Enqueue

  • 原因:两条消息都是独立的,都应该回复
  • 用户期望:收到两条回复

场景 2:搜索/查询

情况:用户在搜索

用户:"搜索巴黎酒店"
用户:"不对,搜索东京酒店"

推荐Rollback

  • 原因:用户改变了搜索目标
  • 第一个搜索结果无用

场景 3:表单填写

情况:用户在填写表单

用户:"我的名字是..."
用户:"等等,我再想想"

推荐Interrupt

  • 原因:保留部分填写的内容
  • 用户可能继续填写

场景 4:交易操作

情况:用户在转账

用户:"转账1000元到账户A"
用户:"不对,转到账户B"

推荐Reject

  • 原因:金融交易不能有任何不确定性
  • 必须等第一个完全处理完
  • 或者UI层面禁止快速连发

场景 5:文件处理

情况:用户上传文件处理

用户:"分析这个大文件"
用户:"等等,先看这个小文件"

推荐Interrupt

  • 原因:大文件处理耗时,小文件优先级更高
  • 节省资源

7.4 Python 实现辅助函数

python
async def create_run_with_strategy(
    client,
    thread_id,
    assistant_id,
    input,
    config,
    scenario="chat"
):
    """根据场景自动选择策略"""
    strategy_map = {
        "chat": "enqueue",      # 聊天:全部处理
        "search": "rollback",   # 搜索:最新优先
        "form": "interrupt",    # 表单:保留进度
        "transaction": "reject", # 交易:严格顺序
        "file": "interrupt"     # 文件:可中断
    }

    strategy = strategy_map.get(scenario, "enqueue")

    try:
        return await client.runs.create(
            thread_id,
            assistant_id,
            input=input,
            config=config,
            multitask_strategy=strategy
        )
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 409:
            # Reject 策略返回 409,等待后重试
            await asyncio.sleep(1)
            return await create_run_with_strategy(
                client, thread_id, assistant_id, input, config, scenario
            )
        else:
            raise

八、高级主题

8.1 动态策略选择

根据运行时条件动态选择策略:

python
async def adaptive_create_run(client, thread_id, assistant_id, input, config):
    """自适应策略选择"""

    # 检查当前运行状态
    runs = await client.runs.list(thread_id)
    active_runs = [r for r in runs if r["status"] in ["pending", "running"]]

    if not active_runs:
        # 没有活跃运行,直接创建
        strategy = None
    elif len(active_runs) == 1:
        # 一个活跃运行
        run = active_runs[0]

        # 检查运行时长
        created_at = datetime.fromisoformat(run["created_at"])
        elapsed = datetime.now(timezone.utc) - created_at

        if elapsed.seconds < 5:
            # 刚开始不久,可以回滚
            strategy = "rollback"
        elif elapsed.seconds < 30:
            # 执行中,中断
            strategy = "interrupt"
        else:
            # 执行很久了,等待或拒绝
            strategy = "enqueue"
    else:
        # 多个活跃运行,拒绝
        strategy = "reject"

    return await client.runs.create(
        thread_id,
        assistant_id,
        input=input,
        config=config,
        multitask_strategy=strategy
    )

8.2 用户反馈机制

让用户选择如何处理:

python
async def create_run_with_user_choice(client, thread_id, assistant_id, input, config):
    """让用户选择策略"""

    # 检查是否有活跃运行
    runs = await client.runs.list(thread_id)
    active_runs = [r for r in runs if r["status"] in ["pending", "running"]]

    if active_runs:
        # 询问用户
        print("检测到正在处理的请求,你想要:")
        print("1. 等待当前请求完成后处理 (Enqueue)")
        print("2. 取消当前请求,立即处理新请求 (Interrupt)")
        print("3. 取消当前请求并删除 (Rollback)")
        print("4. 取消新请求 (Reject)")

        choice = input("请选择 (1-4): ")

        strategy_map = {
            "1": "enqueue",
            "2": "interrupt",
            "3": "rollback",
            "4": "reject"
        }
        strategy = strategy_map.get(choice, "enqueue")
    else:
        strategy = None

    if strategy == "reject":
        print("新请求已取消")
        return None

    return await client.runs.create(
        thread_id,
        assistant_id,
        input=input,
        config=config,
        multitask_strategy=strategy
    )

8.3 监控和日志

python
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def create_run_with_logging(
    client, thread_id, assistant_id, input, config, strategy
):
    """带详细日志的运行创建"""

    logger.info(f"Creating run with strategy: {strategy}")
    logger.info(f"Thread: {thread_id}")
    logger.info(f"Input: {input}")

    try:
        run = await client.runs.create(
            thread_id,
            assistant_id,
            input=input,
            config=config,
            multitask_strategy=strategy
        )

        logger.info(f"Run created: {run['run_id']}")
        logger.info(f"Status: {run['status']}")

        return run

    except httpx.HTTPStatusError as e:
        if e.response.status_code == 409:
            logger.warning(f"Conflict (409): Run rejected by {strategy} strategy")
        else:
            logger.error(f"HTTP error: {e}")
        raise

    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise

九、最佳实践

9.1 选择策略的原则

1. 默认使用 Enqueue

python
# 对于大多数场景,enqueue 是最安全的选择
strategy = "enqueue"

2. 明确用户意图时使用 Rollback

python
# 用户明确说"不对"、"算了"、"改成"
if "不对" in user_input or "算了" in user_input:
    strategy = "rollback"

3. 资源受限时使用 Reject

python
# API 调用次数有限、费用昂贵
if expensive_operation:
    strategy = "reject"

4. 用户体验优先时使用 Interrupt

python
# 响应速度重要,旧请求可以中断
if user_priority == "speed":
    strategy = "interrupt"

9.2 错误处理模板

python
async def robust_create_run(
    client, thread_id, assistant_id, input, config, strategy="enqueue", max_retries=3
):
    """健壮的运行创建"""

    for attempt in range(max_retries):
        try:
            return await client.runs.create(
                thread_id,
                assistant_id,
                input=input,
                config=config,
                multitask_strategy=strategy
            )

        except httpx.HTTPStatusError as e:
            if e.response.status_code == 409:
                # Conflict - 根据策略处理
                if strategy == "reject":
                    if attempt < max_retries - 1:
                        logger.info(f"Retry attempt {attempt + 1}/{max_retries}")
                        await asyncio.sleep(2 ** attempt)  # 指数退避
                        continue
                    else:
                        logger.error("Max retries reached")
                        raise
                else:
                    # 其他策略不应该返回 409
                    logger.error(f"Unexpected 409 with strategy {strategy}")
                    raise
            else:
                logger.error(f"HTTP {e.response.status_code}: {e}")
                raise

        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            raise

9.3 测试不同策略

python
async def test_all_strategies(client, assistant_id, config):
    """测试所有策略"""

    strategies = ["reject", "enqueue", "interrupt", "rollback"]

    for strategy in strategies:
        print(f"\n=== Testing {strategy} ===")

        # 创建线程
        thread = await client.threads.create()

        # 第一个请求
        run1 = await client.runs.create(
            thread["thread_id"],
            assistant_id,
            input={"messages": [HumanMessage(content="First request")]},
            config=config
        )

        # 等待一点时间
        await asyncio.sleep(0.5)

        # 第二个请求
        try:
            run2 = await client.runs.create(
                thread["thread_id"],
                assistant_id,
                input={"messages": [HumanMessage(content="Second request")]},
                config=config,
                multitask_strategy=strategy
            )
            print(f"Run 2 created: {run2['run_id']}")
        except httpx.HTTPStatusError as e:
            print(f"Run 2 rejected: {e.response.status_code}")

        # 等待完成
        await asyncio.sleep(2)

        # 检查结果
        state = await client.threads.get_state(thread["thread_id"])
        messages = state["values"]["messages"]
        print(f"Total messages: {len(messages)}")

        # 检查第一个运行状态
        run1_status = (await client.runs.get(thread["thread_id"], run1["run_id"]))["status"]
        print(f"Run 1 status: {run1_status}")

十、常见问题和解决方案

10.1 问题:Enqueue 队列过长

现象:用户发送很多消息,队列积压

解决方案

python
# 检查队列长度
async def get_queue_length(client, thread_id):
    runs = await client.runs.list(thread_id)
    pending_runs = [r for r in runs if r["status"] in ["pending"]]
    return len(pending_runs)

# 动态调整策略
async def smart_create_run(client, thread_id, assistant_id, input, config):
    queue_length = await get_queue_length(client, thread_id)

    if queue_length > 5:
        # 队列过长,改用 rollback
        strategy = "rollback"
    else:
        strategy = "enqueue"

    return await client.runs.create(
        thread_id, assistant_id, input=input, config=config,
        multitask_strategy=strategy
    )

10.2 问题:Interrupt 导致副作用丢失

现象:运行被中断,但已经发送了邮件

解决方案

python
# 方案 1:使用 Enqueue 替代 Interrupt
# 对于有副作用的操作,不要使用 interrupt

# 方案 2:幂等性设计
# 确保操作可以安全重试

10.3 问题:Rollback 丢失重要上下文

现象:用户的第一条消息包含重要信息

解决方案

python
# 方案 1:在 UI 层面确认
# "确定要取消之前的请求吗?"

# 方案 2:使用 Interrupt 替代 Rollback
# 保留第一个请求的输入

# 方案 3:合并两个请求
async def merge_requests(client, thread_id, assistant_id, request1, request2, config):
    """合并两个请求"""
    merged_input = {
        "messages": [
            HumanMessage(content=f"Context: {request1}\n\nCurrent request: {request2}")
        ]
    }
    return await client.runs.create(
        thread_id,
        assistant_id,
        input=merged_input,
        config=config,
        multitask_strategy="rollback"
    )

十一、总结

11.1 核心要点

  1. Double Texting 是生产环境的常见问题

    • 用户不会等待
    • 必须优雅处理并发请求
  2. 四种策略各有适用场景

    • Reject:严格顺序控制
    • Enqueue:所有请求都重要
    • Interrupt:快速响应优先
    • Rollback:最新请求优先
  3. 选择策略的关键因素

    • 用户意图
    • 请求重要性
    • 响应速度要求
    • 副作用考虑
  4. 没有万能策略

    • 根据场景选择
    • 可以动态调整
    • 需要权衡利弊

11.2 快速参考

python
# Reject - 拒绝新请求
await client.runs.create(..., multitask_strategy="reject")
# 返回 409 Conflict

# Enqueue - 排队处理
await client.runs.create(..., multitask_strategy="enqueue")
# 两个请求都完成

# Interrupt - 中断当前
await client.runs.create(..., multitask_strategy="interrupt")
# 保存第一个输入,执行第二个

# Rollback - 回滚删除
await client.runs.create(..., multitask_strategy="rollback")
# 删除第一个,只执行第二个

11.3 决策速查表

你的需求推荐策略
所有请求都必须处理Enqueue
新请求优先级更高Interrupt 或 Rollback
第一个请求不能中断Reject 或 Enqueue
需要快速响应Interrupt 或 Rollback
有副作用操作Reject 或 Enqueue
用户明确取消第一个Rollback
保留所有历史Enqueue 或 Interrupt
历史最干净Rollback

11.4 下节预告

6.4-assistant 中,我们将学习:

  • 什么是 Assistants(助手)
  • 如何创建和管理 assistants
  • 配置不同的 assistants
  • Assistants 的版本控制
  • 实际应用场景

文档版本:1.0 最后更新:2024-11-05 作者:AI Assistant 基于:LangChain Academy Module-6 Lesson 6.3

恭喜你掌握了 Double Texting 的所有处理策略!现在你可以构建能优雅处理并发请求的生产级应用了。🎉

基于 MIT 许可证发布。内容版权归作者所有。