Skip to content

6.2 Connecting to a LangGraph Platform Deployment - 详细解读

一、概述

1.1 本节简介

在 6.1 节中,我们学会了如何创建 LangGraph 部署。现在在 6.2 节中,我们将学习如何连接和使用这个部署。这是从开发者视角转向用户视角的重要一步。

核心问题

  • 如何连接到部署的服务?
  • 如何执行图的运行?
  • 如何管理多轮对话?
  • 如何操作长期记忆?

1.2 部署后可以访问什么?

一旦部署成功,你可以通过三种方式访问:

┌─────────────────────────────────────────────────┐
│         LangGraph Platform 部署                  │
├─────────────────────────────────────────────────┤
│                                                  │
│  1. HTTP API                                    │
│     http://localhost:8123                       │
│     → 原始 REST API 端点                         │
│                                                  │
│  2. API 文档 (Swagger UI)                        │
│     http://localhost:8123/docs                  │
│     → 交互式 API 测试界面                         │
│                                                  │
│  3. LangGraph Studio                            │
│     https://smith.langchain.com/studio/         │
│     → 可视化调试和监控                            │
│                                                  │
└─────────────────────────────────────────────────┘

1.3 API 端点的三大类别

LangGraph Server 提供的 API 可以分为三大类:

API 端点分类
├── Runs(运行)
│   └── 原子化的图执行
│       - 单次执行
│       - 可以后台运行
│       - 可以流式传输

├── Threads(线程)
│   └── 多轮交互
│       - 对话历史
│       - 状态管理
│       - Human in the loop

└── Store(存储)
    └── 长期记忆
        - 跨线程数据
        - 用户记忆
        - 持久化存储

1.4 学习目标

通过本节学习,你将掌握:

  1. 连接到部署

    • 使用 LangGraph SDK
    • 使用 Remote Graph
  2. Runs(运行)

    • 后台运行(fire and forget)
    • 阻塞运行(blocking)
    • 流式运行(streaming)
  3. Threads(线程)

    • 查看线程状态
    • 复制线程
    • Human in the loop
    • 状态编辑和时间旅行
  4. Store API

    • 搜索记忆项
    • 添加记忆项
    • 删除记忆项

1.5 从开发到生产的转变

开发环境

python
# 直接调用图
result = graph.invoke({"messages": [HumanMessage("Hello")]})

生产环境

python
# 通过 HTTP API 调用
run = await client.runs.create(
    thread_id,
    "task_maistro",
    input={"messages": [HumanMessage("Hello")]}
)

二、连接到部署

2.1 部署启动确认

在开始之前,确保你的部署正在运行:

bash
# 确认 docker-compose 正在运行
$ cd module-6/deployment
$ docker compose ps

# 应该看到三个服务都在运行
NAME                              STATUS
deployment-langgraph-api-1        Up
deployment-langgraph-postgres-1   Up
deployment-langgraph-redis-1      Up

测试连接

bash
# 测试 API 健康状态
$ curl http://localhost:8123/health
{"status": "ok"}

2.2 方法一:使用 LangGraph SDK

LangGraph SDK 是推荐的连接方式,提供了 Python 和 JavaScript 两个版本。

2.2.1 安装 SDK

python
%%capture --no-stderr
%pip install -U langgraph_sdk

命令解析

  • %%capture --no-stderr:Jupyter 魔法命令,捕获输出但显示错误
  • -U:升级到最新版本
  • langgraph_sdk:SDK 包名(注意是下划线)

2.2.2 创建客户端

python
from langgraph_sdk import get_client

# 连接到部署
url_for_cli_deployment = "http://localhost:8123"
client = get_client(url=url_for_cli_deployment)

代码解析

  • get_client():创建 SDK 客户端
  • url:部署的 URL(可以是本地或远程)

客户端对象提供的功能

python
client.runs       # 运行管理
client.threads    # 线程管理
client.store      # 存储管理
client.assistants # 助手管理(6.4 节)

2.3 方法二:使用 Remote Graph

Remote Graph 允许你像使用本地图一样使用远程部署的图。

2.3.1 安装依赖

python
%%capture --no-stderr
%pip install -U langchain_openai langgraph langchain_core

2.3.2 创建 Remote Graph

python
from langgraph.pregel.remote import RemoteGraph
from langchain_core.messages import convert_to_messages
from langchain_core.messages import HumanMessage, SystemMessage

# 连接到远程图
url = "http://localhost:8123"
graph_name = "task_maistro"
remote_graph = RemoteGraph(graph_name, url=url)

参数说明

  • graph_name:图的名称(在 langgraph.json 中定义的)
  • url:部署的 URL

2.3.3 SDK vs Remote Graph

特性LangGraph SDKRemote Graph
用途全功能 API 客户端类似本地图的接口
适合场景生产环境,完整控制快速原型,最小改动
功能范围全部 API 功能图执行和流式传输
学习曲线需要了解 API 概念熟悉 LangGraph 即可
灵活性高(完全控制)中(简化接口)

何时使用哪个?

使用 SDK

  • 生产环境应用
  • 需要管理 threads、runs、store
  • 需要完整的控制和监控

使用 Remote Graph

  • 快速原型开发
  • 从本地图迁移到远程图
  • 只需要基本的图执行功能

三、Runs(运行)

3.1 什么是 Run?

Run(运行) = 图的一次完整执行

关键特性

  • 每个 run 有唯一的 ID
  • 结果存储在 PostgreSQL
  • 可以查询状态和结果
  • 可以追踪执行历史

Run 的生命周期

pending → running → success/error
  ↓         ↓            ↓
 创建      执行中        完成

3.2 Run 的类型

LangGraph Server 支持三种类型的 runs:

Run 类型
├── Background Run(后台运行)
│   └── Fire and forget
│       - 不等待完成
│       - 适合长时间任务
│       - 可以轮询状态

├── Blocking Run(阻塞运行)
│   └── Wait for completion
│       - 等待完成后返回
│       - 确保顺序执行
│       - 同步操作

└── Streaming Run(流式运行)
    └── Real-time updates
        - 实时接收更新
        - 流式传输 tokens
        - 改善用户体验

3.3 Background Runs(后台运行)

3.3.1 创建线程

python
# 创建一个新线程
thread = await client.threads.create()
thread

输出

python
{
    'thread_id': '7f71c0dd-768b-4e53-8349-42bdd10e7caf',
    'created_at': '2024-11-14T19:36:08.459457+00:00',
    'updated_at': '2024-11-14T19:36:08.459457+00:00',
    'metadata': {},
    'status': 'idle',
    'config': {},
    'values': None
}

字段说明

字段说明示例值
thread_id线程唯一标识UUID
created_at创建时间ISO 8601 格式
updated_at更新时间ISO 8601 格式
status当前状态idle/busy
metadata元数据字典
config配置字典
values当前状态值None(新线程)

3.3.2 检查线程上的运行

python
# 检查线程上是否有运行
thread = await client.threads.create()
runs = await client.runs.list(thread["thread_id"])
print(runs)

输出

python
[]
  • 新线程没有任何运行记录

3.3.3 创建后台运行

python
# 创建一些 ToDos
user_input = "Add a ToDo to finish booking travel to Hong Kong by end of next week. Also, add a ToDo to call parents back about Thanksgiving plans."
config = {"configurable": {"user_id": "Test"}}
graph_name = "task_maistro"

# 创建运行(后台执行)
run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input)]},
    config=config
)

参数详解

python
await client.runs.create(
    thread_id,         # 线程 ID
    assistant_id,      # 图/助手名称
    input={...},       # 输入数据
    config={...}       # 配置(如 user_id)
)

这是一个后台运行

  • 函数立即返回
  • 不等待执行完成
  • 图在后台异步执行

3.3.4 启动新运行并检查状态

python
# 创建新线程和新运行
thread = await client.threads.create()
user_input = "Give me a summary of all ToDos."
config = {"configurable": {"user_id": "Test"}}
graph_name = "task_maistro"

# 创建运行
run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input)]},
    config=config
)

# 检查运行状态
print(await client.runs.get(thread["thread_id"], run["run_id"]))

输出(简化)

python
{
    'run_id': '1efa2c00-63e4-6f4a-9c5b-ca3f5f9bff07',
    'thread_id': '641c195a-9e31-4250-a729-6b742c089df8',
    'status': 'pending',  # 仍在运行
    'created_at': '2024-11-14T19:38:29.394777+00:00',
    'updated_at': '2024-11-14T19:38:29.394777+00:00',
    ...
}

注意'status': 'pending' 表示运行仍在进行中。

Run 状态说明

状态含义说明
pending等待执行在队列中
running正在执行Queue worker 处理中
success成功完成正常结束
error执行失败发生错误
interrupted被中断人为中断
cancelled被取消主动取消

3.4 Blocking Runs(阻塞运行)

3.4.1 使用 client.runs.join

如果你想等待运行完成(阻塞运行),可以使用 client.runs.join

python
# 等待运行完成
await client.runs.join(thread["thread_id"], run["run_id"])

# 再次检查状态
print(await client.runs.get(thread["thread_id"], run["run_id"]))

输出

python
{
    'run_id': '1efa2c00-63e4-6f4a-9c5b-ca3f5f9bff07',
    'thread_id': '641c195a-9e31-4250-a729-6b742c089df8',
    'status': 'success',  # 已完成
    ...
}

client.runs.join 的作用

  • 阻塞当前执行
  • 等待指定 run 完成
  • 确保在该线程上不会启动新 run,直到当前 run 完成

使用场景

  • 需要确保顺序执行
  • 后续操作依赖当前结果
  • 测试和调试

3.5 Streaming Runs(流式运行)

3.5.1 流式传输的架构

客户端发起流式请求

HTTP Worker 生成 run_id

Queue Worker 开始执行
       ↓ (执行过程中)
Queue Worker 发布更新到 Redis

HTTP Worker 从 Redis 订阅更新

实时推送给客户端

执行完成,关闭连接

3.5.2 流式传输 tokens

最常用的流式传输方式是 streaming tokens(逐 token 返回),改善用户体验。

python
user_input = "What ToDo should I focus on first."

async for chunk in client.runs.stream(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input)]},
    config=config,
    stream_mode="messages-tuple"
):
    if chunk.event == "messages":
        print("".join(
            data_item['content']
            for data_item in chunk.data
            if 'content' in data_item
        ), end="", flush=True)

输出(实时显示)

You might want to focus on "Call parents back about Thanksgiving plans" first. It has a shorter estimated time to complete (15 minutes) and doesn't have a specific deadline, so it could be a quick task to check off your list. Once that's done, you can dedicate more time to "Finish booking travel to Hong Kong," which is more time-consuming and has a deadline.

代码详解

client.runs.stream()

python
async for chunk in client.runs.stream(
    thread_id,              # 线程 ID
    assistant_id,           # 图名称
    input={...},            # 输入
    config={...},           # 配置
    stream_mode="messages-tuple"  # 流式模式
):
    # 处理每个 chunk
    pass

stream_mode 选项

模式说明用途
values完整的状态值查看每个节点后的完整状态
updates状态更新查看每个节点的更新
messages消息列表查看消息流
messages-tuple消息元组(推荐)逐 token 流式传输
custom自定义流高级用例

打印逻辑

python
if chunk.event == "messages":
    # 提取所有包含 'content' 的 data_item
    content = "".join(
        data_item['content']
        for data_item in chunk.data
        if 'content' in data_item
    )
    # 实时打印,不换行
    print(content, end="", flush=True)

Python 知识点 - print 参数

python
print(text, end="", flush=True)
#           ^^^^^^  ^^^^^^^^^^
#           不换行   立即刷新缓冲区

# 默认行为:
print(text)  # 等价于 print(text, end="\n", flush=False)

# end="" 的作用:
print("Hello", end="")
print("World")
# 输出:HelloWorld

# flush=True 的作用:
# 立即输出,不等待缓冲区满
# 对于实时流式传输很重要

3.5.3 流式传输的优势

没有流式传输

用户发送请求

等待 30 秒...(用户看不到任何反馈)

一次性返回完整响应

有流式传输

用户发送请求

立即看到开始生成

逐字实时显示

用户体验更好

四、Threads(线程)

4.1 Run vs Thread

重要区别

Run(运行)                 Thread(线程)
    ↓                          ↓
一次执行                    多轮交互
无状态                      有状态
单次任务                    对话历史

类比

  • Run = 一次函数调用
  • Thread = 一个会话/对话

4.2 Thread 的持久化

当客户端使用 thread_id 执行图时:

  1. 服务器保存所有 checkpoints(检查点)到线程
  2. Checkpoints 存储在 PostgreSQL 中
  3. 每个节点执行后都会创建一个 checkpoint
  4. 可以从任何 checkpoint 继续执行

Checkpoint 的作用

  • 保存执行历史
  • 支持时间旅行(time travel)
  • 实现 Human in the loop
  • 错误恢复

4.3 检查线程状态

python
# 获取线程的当前状态
thread_state = await client.threads.get_state(thread['thread_id'])

# 打印消息历史
for m in convert_to_messages(thread_state['values']['messages']):
    m.pretty_print()

输出

================================ Human Message =================================
Give me a summary of all ToDos.

================================== Ai Message ==================================
Here's a summary of your current ToDo list:

1. **Task:** Finish booking travel to Hong Kong
   - **Status:** Not started
   - **Deadline:** November 22, 2024
   - **Solutions:**
     - Check flight prices on Skyscanner
     - Book hotel through Booking.com
     - Arrange airport transfer
   - **Estimated Time to Complete:** 120 minutes

2. **Task:** Call parents back about Thanksgiving plans
   - **Status:** Not started
   - **Deadline:** None
   - **Solutions:**
     - Check calendar for availability
     - Discuss travel arrangements
     - Confirm dinner plans
   - **Estimated Time to Complete:** 15 minutes

Let me know if there's anything else you'd like to do with your ToDo list!

================================ Human Message =================================
What ToDo should I focus on first.

================================== Ai Message ==================================
You might want to focus on "Call parents back about Thanksgiving plans" first...

分析

  • 线程保存了完整的对话历史
  • 包含所有用户输入和 AI 响应
  • 这是多轮对话的基础

4.4 复制线程(Fork)

复制线程允许你"分叉"对话,创建独立的分支。

python
# 复制线程
copied_thread = await client.threads.copy(thread['thread_id'])

# 检查复制的线程状态
copied_thread_state = await client.threads.get_state(copied_thread['thread_id'])
for m in convert_to_messages(copied_thread_state['values']['messages']):
    m.pretty_print()

输出:复制的线程包含完全相同的历史记录。

使用场景

原始线程

    ├─ Run 1: "给我 ToDo 总结"
    ├─ Run 2: "应该先做哪个?"

    ├─────────┬─────────┐
    │         │         │
复制线程 A  复制线程 B  原始线程继续
    │         │         │
    ↓         ↓         ↓
探索不同   测试新    正常使用
的对话    功能
路径

实际应用

  1. A/B 测试:测试不同的对话路径
  2. What-if 分析:探索不同的决策结果
  3. 并行实验:在不影响原始对话的情况下测试
  4. 快照保存:保存对话的某个时刻

4.5 Human in the Loop

4.5.1 概念回顾

在 Module 3 中,我们学习了 Human in the loop(人机协作):

  • 在执行过程中暂停
  • 人类检查和修改状态
  • 从修改后的状态继续执行

4.5.2 获取线程历史

python
# 获取线程的完整历史
states = await client.threads.get_history(thread['thread_id'])

# 选择一个历史状态来分叉
to_fork = states[-2]  # 倒数第二个状态
to_fork['values']

输出

python
{
    'messages': [
        {
            'content': 'Give me a summary of all ToDos.',
            'type': 'human',
            'id': '3680da45-e3a5-4a47-b5b1-4fd4d3e8baf9',
            ...
        }
    ]
}

states 列表结构

python
states = [
    state_0,  # 最新状态
    state_1,  # 上一个状态
    state_2,  # 更早的状态
    ...
]

# states[-1] = 最早的状态
# states[-2] = 倒数第二个状态
# states[0]  = 最新状态

4.5.3 检查状态详情

python
# 检查消息 ID
print(to_fork['values']['messages'][0]['id'])
# 输出:'3680da45-e3a5-4a47-b5b1-4fd4d3e8baf9'

# 检查下一个要执行的节点
print(to_fork['next'])
# 输出:['task_mAIstro']

# 检查 checkpoint ID
print(to_fork['checkpoint_id'])
# 输出:'1efa2c00-6609-67ff-8000-491b1dcf8129'

字段说明

字段说明用途
values当前状态值包含消息、数据等
next下一个要执行的节点知道从哪里继续
checkpoint_id检查点 ID用于时间旅行
metadata元数据额外信息

4.5.4 编辑状态

关键概念:消息 reducer 的工作方式

python
# 回顾 Module-2:消息 reducer
def add_messages(left, right):
    # 如果提供了消息 ID,则更新(覆盖)
    # 如果没有 ID,则追加
    ...

编辑状态的技巧

  1. 不提供 ID → 追加新消息
  2. 提供 ID → 覆盖现有消息
python
# 创建新消息,但使用旧消息的 ID(覆盖)
forked_input = {
    "messages": HumanMessage(
        content="Give me a summary of all ToDos that need to be done in the next week.",
        id=to_fork['values']['messages'][0]['id']  # 使用旧 ID
    )
}

# 更新状态,创建新的 checkpoint
forked_config = await client.threads.update_state(
    thread["thread_id"],
    forked_input,
    checkpoint_id=to_fork['checkpoint_id']  # 从这个 checkpoint 分叉
)

这行代码的作用

原始线程历史:
checkpoint_0 → checkpoint_1 → checkpoint_2

                └─ 从这里分叉
                   创建新分支:
                   checkpoint_1' → checkpoint_2' → ...

4.5.5 从新 checkpoint 继续执行

python
# 从新 checkpoint 运行图
async for chunk in client.runs.stream(
    thread["thread_id"],
    graph_name,
    input=None,  # 不需要新输入,使用更新后的状态
    config=config,
    checkpoint_id=forked_config['checkpoint_id'],  # 从这个 checkpoint 开始
    stream_mode="messages-tuple"
):
    if chunk.event == "messages":
        print("".join(
            data_item['content']
            for data_item in chunk.data
            if 'content' in data_item
        ), end="", flush=True)

输出

Here's a summary of your ToDos that need to be done in the next week:

1. **Finish booking travel to Hong Kong**
   - **Status:** Not started
   - **Deadline:** November 22, 2024
   - **Solutions:**
     - Check flight prices on Skyscanner
     - Book hotel through Booking.com
     - Arrange airport transfer
   - **Estimated Time to Complete:** 120 minutes

It looks like this task is due soon, so you might want to prioritize it. Let me know if there's anything else you need help with!

注意

  • AI 只返回了下周需要完成的任务
  • 没有返回没有截止日期的任务
  • 这证明了状态编辑成功了!

4.5.6 时间旅行(Time Travel)总结

完整的时间旅行流程:

1. 获取历史:get_history()

2. 选择检查点:states[-2]

3. 检查详情:next, checkpoint_id

4. 编辑状态:update_state() with message ID

5. 创建新分支:新的 checkpoint

6. 从新分支继续:stream() with checkpoint_id

实际应用场景

  1. 用户纠正错误

    用户:"预订香港的机票"
    AI:[错误理解]
    用户:← 回到上一步,重新输入
  2. 探索不同选项

    选择点 A
    ├─ 选项 1 → 结果 1
    ├─ 选项 2 → 结果 2
    └─ 选项 3 → 结果 3
  3. 调试和测试

    问题出现
    
    回到出错前的状态
    
    修改输入重新执行
    
    验证修复

五、跨线程记忆(Store API)

5.1 Store 的作用回顾

在 Module-5 中,我们学习了 Store(长期记忆):

  • 保存跨线程的信息
  • 按 namespace 组织
  • 持久化存储

开发 vs 生产

环境Store 实现
开发InMemoryStore(内存)
生产PostgreSQL(数据库)

5.2 task_maistro 的 Store 使用

task_maistro 图使用 Store 保存 ToDos:

Namespace 结构

python
("todo", "todo_category", "user_id")
# 例如:
("todo", "general", "Test")
("todo", "work", "lance")
("todo", "personal", "lance")

默认值

  • todo_category 默认为 "general"(在 deployment/configuration.py 中定义)

5.3 搜索记忆项

python
# 搜索特定 namespace 中的所有 items
items = await client.store.search_items(
    ("todo", "general", "Test"),
    limit=5,     # 最多返回 5 个
    offset=0     # 从第 0 个开始
)

items['items']

输出

python
[
    {
        'value': {
            'task': 'Finish booking travel to Hong Kong',
            'status': 'not started',
            'deadline': '2024-11-22T23:59:59',
            'solutions': [
                'Check flight prices on Skyscanner',
                'Book hotel through Booking.com',
                'Arrange airport transfer'
            ],
            'time_to_complete': 120
        },
        'key': '18524803-c182-49de-9b10-08ccb0a06843',
        'namespace': ['todo', 'general', 'Test'],
        'created_at': '2024-11-14T19:37:41.664827+00:00',
        'updated_at': '2024-11-14T19:37:41.664827+00:00'
    },
    {
        'value': {
            'task': 'Call parents back about Thanksgiving plans',
            'status': 'not started',
            'deadline': None,
            'solutions': [
                'Check calendar for availability',
                'Discuss travel arrangements',
                'Confirm dinner plans'
            ],
            'time_to_complete': 15
        },
        'key': '375d9596-edf8-4de2-985b-bacdc623d6ef',
        'namespace': ['todo', 'general', 'Test'],
        'created_at': '2024-11-14T19:37:41.664827+00:00',
        'updated_at': '2024-11-14T19:37:41.664827+00:00'
    }
]

字段说明

字段说明示例
value实际数据(字典)ToDo 的详细信息
key唯一标识(UUID)用于更新/删除
namespace命名空间["todo", "general", "Test"]
created_at创建时间ISO 8601 格式
updated_at更新时间ISO 8601 格式

search_items 参数

python
await client.store.search_items(
    namespace,     # 必需:命名空间元组
    limit=10,      # 可选:最大返回数量
    offset=0,      # 可选:偏移量(分页)
    filter={}      # 可选:过滤条件
)

分页示例

python
# 第一页(0-9)
page1 = await client.store.search_items(
    ("todo", "general", "Test"),
    limit=10,
    offset=0
)

# 第二页(10-19)
page2 = await client.store.search_items(
    ("todo", "general", "Test"),
    limit=10,
    offset=10
)

5.4 添加记忆项

在图的代码中,我们使用 store.put() 添加项。 使用 SDK,我们可以在图外部直接添加项:

python
from uuid import uuid4

# 直接添加项到 store
await client.store.put_item(
    ("testing", "Test"),          # namespace
    key=str(uuid4()),              # 生成唯一 key
    value={"todo": "Test SDK put_item"}  # 数据
)

验证添加

python
# 搜索刚添加的项
items = await client.store.search_items(
    ("testing", "Test"),
    limit=5,
    offset=0
)

items['items']

输出

python
[
    {
        'value': {'todo': 'Test SDK put_item'},
        'key': '3de441ba-8c79-4beb-8f52-00e4dcba68d4',
        'namespace': ['testing', 'Test'],
        'created_at': '2024-11-14T19:56:30.452808+00:00',
        'updated_at': '2024-11-14T19:56:30.452808+00:00'
    }
]

put_item 参数

python
await client.store.put_item(
    namespace,     # 命名空间元组
    key=key,       # 唯一键(字符串)
    value=value    # 数据(字典)
)

Key 管理策略

python
# 策略 1:使用 UUID(推荐)
from uuid import uuid4
key = str(uuid4())

# 策略 2:使用自定义 ID
key = f"todo_{user_id}_{task_id}"

# 策略 3:使用内容哈希
import hashlib
key = hashlib.md5(content.encode()).hexdigest()

5.5 删除记忆项

python
# 获取所有 item 的 keys
keys = [item['key'] for item in items['items']]
print(keys)
# 输出:['3de441ba-8c79-4beb-8f52-00e4dcba68d4', ...]

# 删除特定 item
await client.store.delete_item(
    ("testing", "Test"),                        # namespace
    key='3de441ba-8c79-4beb-8f52-00e4dcba68d4'  # key
)

验证删除

python
# 再次搜索
items = await client.store.search_items(
    ("testing", "Test"),
    limit=5,
    offset=0
)

items['items']
# 输出:只剩一个 item

Python 知识点 - 列表推导式

python
keys = [item['key'] for item in items['items']]

# 等价于:
keys = []
for item in items['items']:
    keys.append(item['key'])

# 更复杂的例子:
active_tasks = [
    item['value']['task']
    for item in items['items']
    if item['value']['status'] == 'not started'
]

5.6 Store API 完整总结

python
# 1. 搜索
items = await client.store.search_items(
    namespace,
    limit=10,
    offset=0
)

# 2. 添加/更新
await client.store.put_item(
    namespace,
    key=key,
    value=value
)

# 3. 获取单个
item = await client.store.get_item(
    namespace,
    key=key
)

# 4. 删除
await client.store.delete_item(
    namespace,
    key=key
)

六、LangGraph Platform 架构深入理解

6.1 完整的请求流程

同步请求(Blocking Run)

客户端
  ↓ 发送请求
HTTP Worker
  ↓ 创建 run_id
  ↓ 将任务加入队列(Redis)
  ↓ 等待完成
Queue Worker
  ↓ 从队列获取任务
  ↓ 执行图
  ↓ 写入结果(PostgreSQL)
  ↓ 通知完成(Redis)
HTTP Worker
  ↓ 接收完成通知
  ↓ 返回结果
客户端
  ↓ 接收响应

流式请求(Streaming Run)

客户端
  ↓ 发送流式请求
HTTP Worker
  ↓ 建立 WebSocket 连接
  ↓ 创建 run_id
  ↓ 订阅 Redis 频道
Queue Worker
  ↓ 执行图
  ↓ 发布更新到 Redis(每个节点)

Redis
  ↓ 实时传递更新
HTTP Worker
  ↓ 接收更新
  ↓ 推送给客户端
客户端
  ↓ 实时接收 chunks

6.2 数据流

客户端请求

┌───────────────────────────────────┐
│     LangGraph Server              │
│                                   │
│  HTTP Worker                      │
│    ↓                              │
│  Redis (消息队列/缓存)             │
│    ↓                              │
│  Queue Worker                     │
│    ↓                              │
│  PostgreSQL (持久化)              │
│    - threads                      │
│    - runs                         │
│    - checkpoints                  │
│    - store                        │
└───────────────────────────────────┘

客户端响应

6.3 扩展性

水平扩展

            负载均衡器

    ┌───────────┼───────────┐
    ↓           ↓           ↓
HTTP Worker HTTP Worker HTTP Worker
    ↓           ↓           ↓
    └───────────┼───────────┘

              Redis

    ┌───────────┼───────────┐
    ↓           ↓           ↓
Queue Worker Queue Worker Queue Worker
    ↓           ↓           ↓
    └───────────┼───────────┘

           PostgreSQL

扩展策略

  1. 增加 HTTP Workers → 处理更多并发请求
  2. 增加 Queue Workers → 处理更多任务
  3. Redis Cluster → 提高缓存性能
  4. PostgreSQL 主从复制 → 提高读性能

七、实战演示

7.1 完整的对话流程

python
from langgraph_sdk import get_client
from langchain_core.messages import HumanMessage

# 1. 连接到部署
client = get_client(url="http://localhost:8123")

# 2. 创建线程
thread = await client.threads.create()

# 3. 配置
config = {"configurable": {"user_id": "demo_user"}}

# 4. 第一轮对话
async for chunk in client.runs.stream(
    thread["thread_id"],
    "task_maistro",
    input={"messages": [HumanMessage("Add a ToDo to buy groceries")]},
    config=config,
    stream_mode="messages-tuple"
):
    if chunk.event == "messages":
        print("".join(
            d['content'] for d in chunk.data if 'content' in d
        ), end="", flush=True)

# 5. 第二轮对话(在同一线程中)
async for chunk in client.runs.stream(
    thread["thread_id"],
    "task_maistro",
    input={"messages": [HumanMessage("What's on my ToDo list?")]},
    config=config,
    stream_mode="messages-tuple"
):
    if chunk.event == "messages":
        print("".join(
            d['content'] for d in chunk.data if 'content' in d
        ), end="", flush=True)

# 6. 检查 Store
todos = await client.store.search_items(
    ("todo", "general", "demo_user")
)

for todo in todos['items']:
    print(f"- {todo['value']['task']}")

7.2 批量操作示例

python
# 批量创建 ToDos
tasks = [
    "Finish project report",
    "Call dentist for appointment",
    "Buy birthday gift for mom",
    "Review code PRs"
]

for task in tasks:
    await client.runs.create(
        thread["thread_id"],
        "task_maistro",
        input={"messages": [HumanMessage(f"Add ToDo: {task}")]},
        config=config
    )
    # 等待完成
    await client.runs.join(thread["thread_id"], run["run_id"])

# 获取所有 ToDos
all_todos = await client.store.search_items(
    ("todo", "general", "demo_user"),
    limit=100
)

print(f"Total: {len(all_todos['items'])} ToDos")

八、最佳实践

8.1 错误处理

python
import httpx

try:
    run = await client.runs.create(
        thread["thread_id"],
        "task_maistro",
        input={"messages": [HumanMessage("Hello")]},
        config=config
    )
except httpx.HTTPStatusError as e:
    if e.response.status_code == 409:
        print("Conflict: Another run is already in progress")
    elif e.response.status_code == 404:
        print("Not found: Thread or graph doesn't exist")
    else:
        print(f"HTTP error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

8.2 超时处理

python
import asyncio

async def run_with_timeout(client, thread_id, assistant_id, input, config, timeout=30):
    """运行图,带超时"""
    try:
        run = await client.runs.create(thread_id, assistant_id, input=input, config=config)
        await asyncio.wait_for(
            client.runs.join(thread_id, run["run_id"]),
            timeout=timeout
        )
        return await client.runs.get(thread_id, run["run_id"])
    except asyncio.TimeoutError:
        print(f"Run timed out after {timeout} seconds")
        return None

8.3 重试机制

python
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def create_run_with_retry(client, thread_id, assistant_id, input, config):
    """创建运行,带重试"""
    return await client.runs.create(thread_id, assistant_id, input=input, config=config)

8.4 日志记录

python
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def create_run_with_logging(client, thread_id, assistant_id, input, config):
    """创建运行,带日志"""
    logger.info(f"Creating run for thread {thread_id}")

    run = await client.runs.create(thread_id, assistant_id, input=input, config=config)

    logger.info(f"Run created: {run['run_id']}")
    logger.info(f"Status: {run['status']}")

    return run

九、常见问题和解决方案

9.1 连接失败

问题:无法连接到 http://localhost:8123

解决方案

bash
# 1. 检查部署是否运行
$ docker compose ps

# 2. 检查端口映射
$ docker compose ps | grep 8123

# 3. 测试连接
$ curl http://localhost:8123/health

# 4. 查看日志
$ docker compose logs langgraph-api

9.2 Run 一直是 pending 状态

问题:Run 长时间停留在 pending 状态

可能原因

  1. Queue Worker 没有运行
  2. Redis 连接问题
  3. 任务队列堵塞

解决方案

bash
# 检查所有服务状态
$ docker compose ps

# 查看 Queue Worker 日志
$ docker compose logs langgraph-api | grep -i "worker"

# 重启服务
$ docker compose restart

9.3 Thread 状态不更新

问题:线程状态看起来没有更新

原因:可能是缓存问题

解决方案

python
# 强制刷新状态
thread_state = await client.threads.get_state(
    thread['thread_id'],
    checkpoint_id=None  # 获取最新状态
)

9.4 Store 数据丢失

问题:Store 中的数据不见了

检查清单

  1. Namespace 是否正确?
  2. PostgreSQL 是否正常运行?
  3. 是否执行了 docker compose down -v(删除了数据卷)?

解决方案

python
# 验证 namespace
items = await client.store.search_items(
    ("todo", "general", "Test"),  # 确保 namespace 正确
    limit=100
)

print(f"Found {len(items['items'])} items")

十、性能优化

10.1 批量操作

不好的做法

python
# 逐个创建运行
for task in tasks:
    run = await client.runs.create(...)
    await client.runs.join(...)  # 等待每个完成

好的做法

python
# 并发创建运行
runs = []
for task in tasks:
    run = await client.runs.create(...)
    runs.append(run)

# 一次性等待所有完成
await asyncio.gather(*[
    client.runs.join(thread_id, run["run_id"])
    for run in runs
])

10.2 连接池

python
import httpx

# 使用连接池
async with httpx.AsyncClient() as http_client:
    client = get_client(
        url="http://localhost:8123",
        http_client=http_client
    )

    # 使用 client...

10.3 缓存

python
from functools import lru_cache
from datetime import datetime, timedelta

# 缓存线程状态
_state_cache = {}
_cache_ttl = timedelta(seconds=5)

async def get_thread_state_cached(client, thread_id):
    now = datetime.now()

    if thread_id in _state_cache:
        state, timestamp = _state_cache[thread_id]
        if now - timestamp < _cache_ttl:
            return state

    state = await client.threads.get_state(thread_id)
    _state_cache[thread_id] = (state, now)
    return state

十一、Python 和异步编程知识总结

11.1 async/await 基础

python
# 异步函数定义
async def my_async_function():
    # 异步操作
    result = await some_async_operation()
    return result

# 调用异步函数
result = await my_async_function()

# 在 Jupyter 中
# 可以直接使用 await
thread = await client.threads.create()

# 在普通 Python 脚本中
# 需要使用 asyncio.run()
import asyncio
asyncio.run(my_async_function())

11.2 异步迭代

python
# 异步 for 循环
async for chunk in client.runs.stream(...):
    print(chunk)

# 等价于同步版本:
# for item in sync_iterator:
#     print(item)

11.3 并发执行

python
import asyncio

# 方法 1:asyncio.gather(推荐)
results = await asyncio.gather(
    client.threads.create(),
    client.threads.create(),
    client.threads.create()
)
# 返回 [thread1, thread2, thread3]

# 方法 2:asyncio.wait
tasks = [
    client.threads.create(),
    client.threads.create()
]
done, pending = await asyncio.wait(tasks)

# 方法 3:手动管理
tasks = []
for i in range(10):
    task = asyncio.create_task(client.threads.create())
    tasks.append(task)

results = await asyncio.gather(*tasks)

11.4 异步上下文管理器

python
# 异步 with 语句
async with httpx.AsyncClient() as client:
    response = await client.get("http://...")

# 等价于:
client = httpx.AsyncClient()
try:
    response = await client.get("http://...")
finally:
    await client.aclose()

十二、总结

12.1 核心要点

  1. 两种连接方式

    • LangGraph SDK:全功能,推荐生产环境
    • Remote Graph:简化接口,快速原型
  2. 三种运行类型

    • Background Run:后台执行,不等待
    • Blocking Run:等待完成
    • Streaming Run:实时流式传输
  3. Threads 的威力

    • 多轮对话
    • 状态持久化
    • 时间旅行
    • Human in the loop
  4. Store API

    • 跨线程记忆
    • search/put/delete 操作
    • Namespace 组织

12.2 API 快速参考

python
# 连接
client = get_client(url="http://localhost:8123")

# Threads
thread = await client.threads.create()
state = await client.threads.get_state(thread_id)
copied = await client.threads.copy(thread_id)
history = await client.threads.get_history(thread_id)
await client.threads.update_state(thread_id, values, checkpoint_id)

# Runs
run = await client.runs.create(thread_id, assistant_id, input, config)
status = await client.runs.get(thread_id, run_id)
await client.runs.join(thread_id, run_id)
async for chunk in client.runs.stream(thread_id, assistant_id, ...):
    pass

# Store
items = await client.store.search_items(namespace, limit, offset)
await client.store.put_item(namespace, key, value)
item = await client.store.get_item(namespace, key)
await client.store.delete_item(namespace, key)

12.3 最佳实践清单

  • ✅ 使用 LangGraph SDK 而不是直接调用 REST API
  • ✅ 为长时间任务使用流式传输
  • ✅ 合理使用 threads 进行多轮对话
  • ✅ 利用 checkpoints 实现 Human in the loop
  • ✅ 使用 store 保存跨线程记忆
  • ✅ 实现错误处理和重试机制
  • ✅ 记录日志便于调试
  • ✅ 使用异步并发提高性能

12.4 下节预告

6.3-double-texting 中,我们将学习:

  • 什么是 double texting(并发请求)
  • Reject 策略:拒绝新请求
  • Enqueue 策略:排队等待
  • Interrupt 策略:中断当前
  • Rollback 策略:回滚重新开始
  • 如何选择合适的策略

文档版本:1.0 最后更新:2024-11-05 作者:AI Assistant 基于:LangChain Academy Module-6 Lesson 6.2

恭喜你掌握了 LangGraph Platform 的核心 API!现在你可以自由地连接和使用部署的图了。🎉

基于 MIT 许可证发布。内容版权归作者所有。