6.2 Connecting to a LangGraph Platform Deployment - 详细解读
一、概述
1.1 本节简介
在 6.1 节中,我们学会了如何创建 LangGraph 部署。现在在 6.2 节中,我们将学习如何连接和使用这个部署。这是从开发者视角转向用户视角的重要一步。
核心问题:
- 如何连接到部署的服务?
- 如何执行图的运行?
- 如何管理多轮对话?
- 如何操作长期记忆?
1.2 部署后可以访问什么?
一旦部署成功,你可以通过三种方式访问:
┌─────────────────────────────────────────────────┐
│ LangGraph Platform 部署 │
├─────────────────────────────────────────────────┤
│ │
│ 1. HTTP API │
│ http://localhost:8123 │
│ → 原始 REST API 端点 │
│ │
│ 2. API 文档 (Swagger UI) │
│ http://localhost:8123/docs │
│ → 交互式 API 测试界面 │
│ │
│ 3. LangGraph Studio │
│ https://smith.langchain.com/studio/ │
│ → 可视化调试和监控 │
│ │
└─────────────────────────────────────────────────┘
1.3 API 端点的三大类别
LangGraph Server 提供的 API 可以分为三大类:
API 端点分类
├── Runs(运行)
│ └── 原子化的图执行
│ - 单次执行
│ - 可以后台运行
│ - 可以流式传输
│
├── Threads(线程)
│ └── 多轮交互
│ - 对话历史
│ - 状态管理
│ - Human in the loop
│
└── Store(存储)
└── 长期记忆
- 跨线程数据
- 用户记忆
- 持久化存储
1.4 学习目标
通过本节学习,你将掌握:
连接到部署
- 使用 LangGraph SDK
- 使用 Remote Graph
Runs(运行)
- 后台运行(fire and forget)
- 阻塞运行(blocking)
- 流式运行(streaming)
Threads(线程)
- 查看线程状态
- 复制线程
- Human in the loop
- 状态编辑和时间旅行
Store API
- 搜索记忆项
- 添加记忆项
- 删除记忆项
1.5 从开发到生产的转变
开发环境:
# 直接调用图
result = graph.invoke({"messages": [HumanMessage("Hello")]})
生产环境:
# 通过 HTTP API 调用
run = await client.runs.create(
thread_id,
"task_maistro",
input={"messages": [HumanMessage("Hello")]}
)
二、连接到部署
2.1 部署启动确认
在开始之前,确保你的部署正在运行:
# 确认 docker-compose 正在运行
$ cd module-6/deployment
$ docker compose ps
# 应该看到三个服务都在运行
NAME STATUS
deployment-langgraph-api-1 Up
deployment-langgraph-postgres-1 Up
deployment-langgraph-redis-1 Up
测试连接:
# 测试 API 健康状态
$ curl http://localhost:8123/health
{"status": "ok"}
2.2 方法一:使用 LangGraph SDK
LangGraph SDK 是推荐的连接方式,提供了 Python 和 JavaScript 两个版本。
2.2.1 安装 SDK
%%capture --no-stderr
%pip install -U langgraph_sdk
命令解析:
%%capture --no-stderr
:Jupyter 魔法命令,捕获输出但显示错误-U
:升级到最新版本langgraph_sdk
:SDK 包名(注意是下划线)
2.2.2 创建客户端
from langgraph_sdk import get_client
# 连接到部署
url_for_cli_deployment = "http://localhost:8123"
client = get_client(url=url_for_cli_deployment)
代码解析:
get_client()
:创建 SDK 客户端url
:部署的 URL(可以是本地或远程)
客户端对象提供的功能:
client.runs # 运行管理
client.threads # 线程管理
client.store # 存储管理
client.assistants # 助手管理(6.4 节)
2.3 方法二:使用 Remote Graph
Remote Graph 允许你像使用本地图一样使用远程部署的图。
2.3.1 安装依赖
%%capture --no-stderr
%pip install -U langchain_openai langgraph langchain_core
2.3.2 创建 Remote Graph
from langgraph.pregel.remote import RemoteGraph
from langchain_core.messages import convert_to_messages
from langchain_core.messages import HumanMessage, SystemMessage
# 连接到远程图
url = "http://localhost:8123"
graph_name = "task_maistro"
remote_graph = RemoteGraph(graph_name, url=url)
参数说明:
graph_name
:图的名称(在 langgraph.json 中定义的)url
:部署的 URL
2.3.3 SDK vs Remote Graph
特性 | LangGraph SDK | Remote Graph |
---|---|---|
用途 | 全功能 API 客户端 | 类似本地图的接口 |
适合场景 | 生产环境,完整控制 | 快速原型,最小改动 |
功能范围 | 全部 API 功能 | 图执行和流式传输 |
学习曲线 | 需要了解 API 概念 | 熟悉 LangGraph 即可 |
灵活性 | 高(完全控制) | 中(简化接口) |
何时使用哪个?
使用 SDK:
- 生产环境应用
- 需要管理 threads、runs、store
- 需要完整的控制和监控
使用 Remote Graph:
- 快速原型开发
- 从本地图迁移到远程图
- 只需要基本的图执行功能
三、Runs(运行)
3.1 什么是 Run?
Run(运行) = 图的一次完整执行
关键特性:
- 每个 run 有唯一的 ID
- 结果存储在 PostgreSQL
- 可以查询状态和结果
- 可以追踪执行历史
Run 的生命周期:
pending → running → success/error
↓ ↓ ↓
创建 执行中 完成
3.2 Run 的类型
LangGraph Server 支持三种类型的 runs:
Run 类型
├── Background Run(后台运行)
│ └── Fire and forget
│ - 不等待完成
│ - 适合长时间任务
│ - 可以轮询状态
│
├── Blocking Run(阻塞运行)
│ └── Wait for completion
│ - 等待完成后返回
│ - 确保顺序执行
│ - 同步操作
│
└── Streaming Run(流式运行)
└── Real-time updates
- 实时接收更新
- 流式传输 tokens
- 改善用户体验
3.3 Background Runs(后台运行)
3.3.1 创建线程
# 创建一个新线程
thread = await client.threads.create()
thread
输出:
{
'thread_id': '7f71c0dd-768b-4e53-8349-42bdd10e7caf',
'created_at': '2024-11-14T19:36:08.459457+00:00',
'updated_at': '2024-11-14T19:36:08.459457+00:00',
'metadata': {},
'status': 'idle',
'config': {},
'values': None
}
字段说明:
字段 | 说明 | 示例值 |
---|---|---|
thread_id | 线程唯一标识 | UUID |
created_at | 创建时间 | ISO 8601 格式 |
updated_at | 更新时间 | ISO 8601 格式 |
status | 当前状态 | idle/busy |
metadata | 元数据 | 字典 |
config | 配置 | 字典 |
values | 当前状态值 | None(新线程) |
3.3.2 检查线程上的运行
# 检查线程上是否有运行
thread = await client.threads.create()
runs = await client.runs.list(thread["thread_id"])
print(runs)
输出:
[]
- 新线程没有任何运行记录
3.3.3 创建后台运行
# 创建一些 ToDos
user_input = "Add a ToDo to finish booking travel to Hong Kong by end of next week. Also, add a ToDo to call parents back about Thanksgiving plans."
config = {"configurable": {"user_id": "Test"}}
graph_name = "task_maistro"
# 创建运行(后台执行)
run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input)]},
config=config
)
参数详解:
await client.runs.create(
thread_id, # 线程 ID
assistant_id, # 图/助手名称
input={...}, # 输入数据
config={...} # 配置(如 user_id)
)
这是一个后台运行:
- 函数立即返回
- 不等待执行完成
- 图在后台异步执行
3.3.4 启动新运行并检查状态
# 创建新线程和新运行
thread = await client.threads.create()
user_input = "Give me a summary of all ToDos."
config = {"configurable": {"user_id": "Test"}}
graph_name = "task_maistro"
# 创建运行
run = await client.runs.create(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input)]},
config=config
)
# 检查运行状态
print(await client.runs.get(thread["thread_id"], run["run_id"]))
输出(简化):
{
'run_id': '1efa2c00-63e4-6f4a-9c5b-ca3f5f9bff07',
'thread_id': '641c195a-9e31-4250-a729-6b742c089df8',
'status': 'pending', # 仍在运行
'created_at': '2024-11-14T19:38:29.394777+00:00',
'updated_at': '2024-11-14T19:38:29.394777+00:00',
...
}
注意:'status': 'pending'
表示运行仍在进行中。
Run 状态说明:
状态 | 含义 | 说明 |
---|---|---|
pending | 等待执行 | 在队列中 |
running | 正在执行 | Queue worker 处理中 |
success | 成功完成 | 正常结束 |
error | 执行失败 | 发生错误 |
interrupted | 被中断 | 人为中断 |
cancelled | 被取消 | 主动取消 |
3.4 Blocking Runs(阻塞运行)
3.4.1 使用 client.runs.join
如果你想等待运行完成(阻塞运行),可以使用 client.runs.join
:
# 等待运行完成
await client.runs.join(thread["thread_id"], run["run_id"])
# 再次检查状态
print(await client.runs.get(thread["thread_id"], run["run_id"]))
输出:
{
'run_id': '1efa2c00-63e4-6f4a-9c5b-ca3f5f9bff07',
'thread_id': '641c195a-9e31-4250-a729-6b742c089df8',
'status': 'success', # 已完成
...
}
client.runs.join
的作用:
- 阻塞当前执行
- 等待指定 run 完成
- 确保在该线程上不会启动新 run,直到当前 run 完成
使用场景:
- 需要确保顺序执行
- 后续操作依赖当前结果
- 测试和调试
3.5 Streaming Runs(流式运行)
3.5.1 流式传输的架构
客户端发起流式请求
↓
HTTP Worker 生成 run_id
↓
Queue Worker 开始执行
↓ (执行过程中)
Queue Worker 发布更新到 Redis
↓
HTTP Worker 从 Redis 订阅更新
↓
实时推送给客户端
↓
执行完成,关闭连接
3.5.2 流式传输 tokens
最常用的流式传输方式是 streaming tokens(逐 token 返回),改善用户体验。
user_input = "What ToDo should I focus on first."
async for chunk in client.runs.stream(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input)]},
config=config,
stream_mode="messages-tuple"
):
if chunk.event == "messages":
print("".join(
data_item['content']
for data_item in chunk.data
if 'content' in data_item
), end="", flush=True)
输出(实时显示):
You might want to focus on "Call parents back about Thanksgiving plans" first. It has a shorter estimated time to complete (15 minutes) and doesn't have a specific deadline, so it could be a quick task to check off your list. Once that's done, you can dedicate more time to "Finish booking travel to Hong Kong," which is more time-consuming and has a deadline.
代码详解:
client.runs.stream()
:
async for chunk in client.runs.stream(
thread_id, # 线程 ID
assistant_id, # 图名称
input={...}, # 输入
config={...}, # 配置
stream_mode="messages-tuple" # 流式模式
):
# 处理每个 chunk
pass
stream_mode
选项:
模式 | 说明 | 用途 |
---|---|---|
values | 完整的状态值 | 查看每个节点后的完整状态 |
updates | 状态更新 | 查看每个节点的更新 |
messages | 消息列表 | 查看消息流 |
messages-tuple | 消息元组(推荐) | 逐 token 流式传输 |
custom | 自定义流 | 高级用例 |
打印逻辑:
if chunk.event == "messages":
# 提取所有包含 'content' 的 data_item
content = "".join(
data_item['content']
for data_item in chunk.data
if 'content' in data_item
)
# 实时打印,不换行
print(content, end="", flush=True)
Python 知识点 - print 参数:
print(text, end="", flush=True)
# ^^^^^^ ^^^^^^^^^^
# 不换行 立即刷新缓冲区
# 默认行为:
print(text) # 等价于 print(text, end="\n", flush=False)
# end="" 的作用:
print("Hello", end="")
print("World")
# 输出:HelloWorld
# flush=True 的作用:
# 立即输出,不等待缓冲区满
# 对于实时流式传输很重要
3.5.3 流式传输的优势
没有流式传输:
用户发送请求
↓
等待 30 秒...(用户看不到任何反馈)
↓
一次性返回完整响应
有流式传输:
用户发送请求
↓
立即看到开始生成
↓
逐字实时显示
↓
用户体验更好
四、Threads(线程)
4.1 Run vs Thread
重要区别:
Run(运行) Thread(线程)
↓ ↓
一次执行 多轮交互
无状态 有状态
单次任务 对话历史
类比:
- Run = 一次函数调用
- Thread = 一个会话/对话
4.2 Thread 的持久化
当客户端使用 thread_id
执行图时:
- 服务器保存所有 checkpoints(检查点)到线程
- Checkpoints 存储在 PostgreSQL 中
- 每个节点执行后都会创建一个 checkpoint
- 可以从任何 checkpoint 继续执行
Checkpoint 的作用:
- 保存执行历史
- 支持时间旅行(time travel)
- 实现 Human in the loop
- 错误恢复
4.3 检查线程状态
# 获取线程的当前状态
thread_state = await client.threads.get_state(thread['thread_id'])
# 打印消息历史
for m in convert_to_messages(thread_state['values']['messages']):
m.pretty_print()
输出:
================================ Human Message =================================
Give me a summary of all ToDos.
================================== Ai Message ==================================
Here's a summary of your current ToDo list:
1. **Task:** Finish booking travel to Hong Kong
- **Status:** Not started
- **Deadline:** November 22, 2024
- **Solutions:**
- Check flight prices on Skyscanner
- Book hotel through Booking.com
- Arrange airport transfer
- **Estimated Time to Complete:** 120 minutes
2. **Task:** Call parents back about Thanksgiving plans
- **Status:** Not started
- **Deadline:** None
- **Solutions:**
- Check calendar for availability
- Discuss travel arrangements
- Confirm dinner plans
- **Estimated Time to Complete:** 15 minutes
Let me know if there's anything else you'd like to do with your ToDo list!
================================ Human Message =================================
What ToDo should I focus on first.
================================== Ai Message ==================================
You might want to focus on "Call parents back about Thanksgiving plans" first...
分析:
- 线程保存了完整的对话历史
- 包含所有用户输入和 AI 响应
- 这是多轮对话的基础
4.4 复制线程(Fork)
复制线程允许你"分叉"对话,创建独立的分支。
# 复制线程
copied_thread = await client.threads.copy(thread['thread_id'])
# 检查复制的线程状态
copied_thread_state = await client.threads.get_state(copied_thread['thread_id'])
for m in convert_to_messages(copied_thread_state['values']['messages']):
m.pretty_print()
输出:复制的线程包含完全相同的历史记录。
使用场景:
原始线程
│
├─ Run 1: "给我 ToDo 总结"
├─ Run 2: "应该先做哪个?"
│
├─────────┬─────────┐
│ │ │
复制线程 A 复制线程 B 原始线程继续
│ │ │
↓ ↓ ↓
探索不同 测试新 正常使用
的对话 功能
路径
实际应用:
- A/B 测试:测试不同的对话路径
- What-if 分析:探索不同的决策结果
- 并行实验:在不影响原始对话的情况下测试
- 快照保存:保存对话的某个时刻
4.5 Human in the Loop
4.5.1 概念回顾
在 Module 3 中,我们学习了 Human in the loop(人机协作):
- 在执行过程中暂停
- 人类检查和修改状态
- 从修改后的状态继续执行
4.5.2 获取线程历史
# 获取线程的完整历史
states = await client.threads.get_history(thread['thread_id'])
# 选择一个历史状态来分叉
to_fork = states[-2] # 倒数第二个状态
to_fork['values']
输出:
{
'messages': [
{
'content': 'Give me a summary of all ToDos.',
'type': 'human',
'id': '3680da45-e3a5-4a47-b5b1-4fd4d3e8baf9',
...
}
]
}
states 列表结构:
states = [
state_0, # 最新状态
state_1, # 上一个状态
state_2, # 更早的状态
...
]
# states[-1] = 最早的状态
# states[-2] = 倒数第二个状态
# states[0] = 最新状态
4.5.3 检查状态详情
# 检查消息 ID
print(to_fork['values']['messages'][0]['id'])
# 输出:'3680da45-e3a5-4a47-b5b1-4fd4d3e8baf9'
# 检查下一个要执行的节点
print(to_fork['next'])
# 输出:['task_mAIstro']
# 检查 checkpoint ID
print(to_fork['checkpoint_id'])
# 输出:'1efa2c00-6609-67ff-8000-491b1dcf8129'
字段说明:
字段 | 说明 | 用途 |
---|---|---|
values | 当前状态值 | 包含消息、数据等 |
next | 下一个要执行的节点 | 知道从哪里继续 |
checkpoint_id | 检查点 ID | 用于时间旅行 |
metadata | 元数据 | 额外信息 |
4.5.4 编辑状态
关键概念:消息 reducer 的工作方式
# 回顾 Module-2:消息 reducer
def add_messages(left, right):
# 如果提供了消息 ID,则更新(覆盖)
# 如果没有 ID,则追加
...
编辑状态的技巧:
- 不提供 ID → 追加新消息
- 提供 ID → 覆盖现有消息
# 创建新消息,但使用旧消息的 ID(覆盖)
forked_input = {
"messages": HumanMessage(
content="Give me a summary of all ToDos that need to be done in the next week.",
id=to_fork['values']['messages'][0]['id'] # 使用旧 ID
)
}
# 更新状态,创建新的 checkpoint
forked_config = await client.threads.update_state(
thread["thread_id"],
forked_input,
checkpoint_id=to_fork['checkpoint_id'] # 从这个 checkpoint 分叉
)
这行代码的作用:
原始线程历史:
checkpoint_0 → checkpoint_1 → checkpoint_2
↑
└─ 从这里分叉
创建新分支:
checkpoint_1' → checkpoint_2' → ...
4.5.5 从新 checkpoint 继续执行
# 从新 checkpoint 运行图
async for chunk in client.runs.stream(
thread["thread_id"],
graph_name,
input=None, # 不需要新输入,使用更新后的状态
config=config,
checkpoint_id=forked_config['checkpoint_id'], # 从这个 checkpoint 开始
stream_mode="messages-tuple"
):
if chunk.event == "messages":
print("".join(
data_item['content']
for data_item in chunk.data
if 'content' in data_item
), end="", flush=True)
输出:
Here's a summary of your ToDos that need to be done in the next week:
1. **Finish booking travel to Hong Kong**
- **Status:** Not started
- **Deadline:** November 22, 2024
- **Solutions:**
- Check flight prices on Skyscanner
- Book hotel through Booking.com
- Arrange airport transfer
- **Estimated Time to Complete:** 120 minutes
It looks like this task is due soon, so you might want to prioritize it. Let me know if there's anything else you need help with!
注意:
- AI 只返回了下周需要完成的任务
- 没有返回没有截止日期的任务
- 这证明了状态编辑成功了!
4.5.6 时间旅行(Time Travel)总结
完整的时间旅行流程:
1. 获取历史:get_history()
↓
2. 选择检查点:states[-2]
↓
3. 检查详情:next, checkpoint_id
↓
4. 编辑状态:update_state() with message ID
↓
5. 创建新分支:新的 checkpoint
↓
6. 从新分支继续:stream() with checkpoint_id
实际应用场景:
用户纠正错误:
用户:"预订香港的机票" AI:[错误理解] 用户:← 回到上一步,重新输入
探索不同选项:
选择点 A ├─ 选项 1 → 结果 1 ├─ 选项 2 → 结果 2 └─ 选项 3 → 结果 3
调试和测试:
问题出现 ↓ 回到出错前的状态 ↓ 修改输入重新执行 ↓ 验证修复
五、跨线程记忆(Store API)
5.1 Store 的作用回顾
在 Module-5 中,我们学习了 Store(长期记忆):
- 保存跨线程的信息
- 按 namespace 组织
- 持久化存储
开发 vs 生产:
环境 | Store 实现 |
---|---|
开发 | InMemoryStore (内存) |
生产 | PostgreSQL(数据库) |
5.2 task_maistro 的 Store 使用
task_maistro
图使用 Store 保存 ToDos:
Namespace 结构:
("todo", "todo_category", "user_id")
# 例如:
("todo", "general", "Test")
("todo", "work", "lance")
("todo", "personal", "lance")
默认值:
todo_category
默认为"general"
(在deployment/configuration.py
中定义)
5.3 搜索记忆项
# 搜索特定 namespace 中的所有 items
items = await client.store.search_items(
("todo", "general", "Test"),
limit=5, # 最多返回 5 个
offset=0 # 从第 0 个开始
)
items['items']
输出:
[
{
'value': {
'task': 'Finish booking travel to Hong Kong',
'status': 'not started',
'deadline': '2024-11-22T23:59:59',
'solutions': [
'Check flight prices on Skyscanner',
'Book hotel through Booking.com',
'Arrange airport transfer'
],
'time_to_complete': 120
},
'key': '18524803-c182-49de-9b10-08ccb0a06843',
'namespace': ['todo', 'general', 'Test'],
'created_at': '2024-11-14T19:37:41.664827+00:00',
'updated_at': '2024-11-14T19:37:41.664827+00:00'
},
{
'value': {
'task': 'Call parents back about Thanksgiving plans',
'status': 'not started',
'deadline': None,
'solutions': [
'Check calendar for availability',
'Discuss travel arrangements',
'Confirm dinner plans'
],
'time_to_complete': 15
},
'key': '375d9596-edf8-4de2-985b-bacdc623d6ef',
'namespace': ['todo', 'general', 'Test'],
'created_at': '2024-11-14T19:37:41.664827+00:00',
'updated_at': '2024-11-14T19:37:41.664827+00:00'
}
]
字段说明:
字段 | 说明 | 示例 |
---|---|---|
value | 实际数据(字典) | ToDo 的详细信息 |
key | 唯一标识(UUID) | 用于更新/删除 |
namespace | 命名空间 | ["todo", "general", "Test"] |
created_at | 创建时间 | ISO 8601 格式 |
updated_at | 更新时间 | ISO 8601 格式 |
search_items
参数:
await client.store.search_items(
namespace, # 必需:命名空间元组
limit=10, # 可选:最大返回数量
offset=0, # 可选:偏移量(分页)
filter={} # 可选:过滤条件
)
分页示例:
# 第一页(0-9)
page1 = await client.store.search_items(
("todo", "general", "Test"),
limit=10,
offset=0
)
# 第二页(10-19)
page2 = await client.store.search_items(
("todo", "general", "Test"),
limit=10,
offset=10
)
5.4 添加记忆项
在图的代码中,我们使用 store.put()
添加项。 使用 SDK,我们可以在图外部直接添加项:
from uuid import uuid4
# 直接添加项到 store
await client.store.put_item(
("testing", "Test"), # namespace
key=str(uuid4()), # 生成唯一 key
value={"todo": "Test SDK put_item"} # 数据
)
验证添加:
# 搜索刚添加的项
items = await client.store.search_items(
("testing", "Test"),
limit=5,
offset=0
)
items['items']
输出:
[
{
'value': {'todo': 'Test SDK put_item'},
'key': '3de441ba-8c79-4beb-8f52-00e4dcba68d4',
'namespace': ['testing', 'Test'],
'created_at': '2024-11-14T19:56:30.452808+00:00',
'updated_at': '2024-11-14T19:56:30.452808+00:00'
}
]
put_item
参数:
await client.store.put_item(
namespace, # 命名空间元组
key=key, # 唯一键(字符串)
value=value # 数据(字典)
)
Key 管理策略:
# 策略 1:使用 UUID(推荐)
from uuid import uuid4
key = str(uuid4())
# 策略 2:使用自定义 ID
key = f"todo_{user_id}_{task_id}"
# 策略 3:使用内容哈希
import hashlib
key = hashlib.md5(content.encode()).hexdigest()
5.5 删除记忆项
# 获取所有 item 的 keys
keys = [item['key'] for item in items['items']]
print(keys)
# 输出:['3de441ba-8c79-4beb-8f52-00e4dcba68d4', ...]
# 删除特定 item
await client.store.delete_item(
("testing", "Test"), # namespace
key='3de441ba-8c79-4beb-8f52-00e4dcba68d4' # key
)
验证删除:
# 再次搜索
items = await client.store.search_items(
("testing", "Test"),
limit=5,
offset=0
)
items['items']
# 输出:只剩一个 item
Python 知识点 - 列表推导式:
keys = [item['key'] for item in items['items']]
# 等价于:
keys = []
for item in items['items']:
keys.append(item['key'])
# 更复杂的例子:
active_tasks = [
item['value']['task']
for item in items['items']
if item['value']['status'] == 'not started'
]
5.6 Store API 完整总结
# 1. 搜索
items = await client.store.search_items(
namespace,
limit=10,
offset=0
)
# 2. 添加/更新
await client.store.put_item(
namespace,
key=key,
value=value
)
# 3. 获取单个
item = await client.store.get_item(
namespace,
key=key
)
# 4. 删除
await client.store.delete_item(
namespace,
key=key
)
六、LangGraph Platform 架构深入理解
6.1 完整的请求流程
同步请求(Blocking Run):
客户端
↓ 发送请求
HTTP Worker
↓ 创建 run_id
↓ 将任务加入队列(Redis)
↓ 等待完成
Queue Worker
↓ 从队列获取任务
↓ 执行图
↓ 写入结果(PostgreSQL)
↓ 通知完成(Redis)
HTTP Worker
↓ 接收完成通知
↓ 返回结果
客户端
↓ 接收响应
流式请求(Streaming Run):
客户端
↓ 发送流式请求
HTTP Worker
↓ 建立 WebSocket 连接
↓ 创建 run_id
↓ 订阅 Redis 频道
Queue Worker
↓ 执行图
↓ 发布更新到 Redis(每个节点)
↓
Redis
↓ 实时传递更新
HTTP Worker
↓ 接收更新
↓ 推送给客户端
客户端
↓ 实时接收 chunks
6.2 数据流
客户端请求
↓
┌───────────────────────────────────┐
│ LangGraph Server │
│ │
│ HTTP Worker │
│ ↓ │
│ Redis (消息队列/缓存) │
│ ↓ │
│ Queue Worker │
│ ↓ │
│ PostgreSQL (持久化) │
│ - threads │
│ - runs │
│ - checkpoints │
│ - store │
└───────────────────────────────────┘
↓
客户端响应
6.3 扩展性
水平扩展:
负载均衡器
↓
┌───────────┼───────────┐
↓ ↓ ↓
HTTP Worker HTTP Worker HTTP Worker
↓ ↓ ↓
└───────────┼───────────┘
↓
Redis
↓
┌───────────┼───────────┐
↓ ↓ ↓
Queue Worker Queue Worker Queue Worker
↓ ↓ ↓
└───────────┼───────────┘
↓
PostgreSQL
扩展策略:
- 增加 HTTP Workers → 处理更多并发请求
- 增加 Queue Workers → 处理更多任务
- Redis Cluster → 提高缓存性能
- PostgreSQL 主从复制 → 提高读性能
七、实战演示
7.1 完整的对话流程
from langgraph_sdk import get_client
from langchain_core.messages import HumanMessage
# 1. 连接到部署
client = get_client(url="http://localhost:8123")
# 2. 创建线程
thread = await client.threads.create()
# 3. 配置
config = {"configurable": {"user_id": "demo_user"}}
# 4. 第一轮对话
async for chunk in client.runs.stream(
thread["thread_id"],
"task_maistro",
input={"messages": [HumanMessage("Add a ToDo to buy groceries")]},
config=config,
stream_mode="messages-tuple"
):
if chunk.event == "messages":
print("".join(
d['content'] for d in chunk.data if 'content' in d
), end="", flush=True)
# 5. 第二轮对话(在同一线程中)
async for chunk in client.runs.stream(
thread["thread_id"],
"task_maistro",
input={"messages": [HumanMessage("What's on my ToDo list?")]},
config=config,
stream_mode="messages-tuple"
):
if chunk.event == "messages":
print("".join(
d['content'] for d in chunk.data if 'content' in d
), end="", flush=True)
# 6. 检查 Store
todos = await client.store.search_items(
("todo", "general", "demo_user")
)
for todo in todos['items']:
print(f"- {todo['value']['task']}")
7.2 批量操作示例
# 批量创建 ToDos
tasks = [
"Finish project report",
"Call dentist for appointment",
"Buy birthday gift for mom",
"Review code PRs"
]
for task in tasks:
await client.runs.create(
thread["thread_id"],
"task_maistro",
input={"messages": [HumanMessage(f"Add ToDo: {task}")]},
config=config
)
# 等待完成
await client.runs.join(thread["thread_id"], run["run_id"])
# 获取所有 ToDos
all_todos = await client.store.search_items(
("todo", "general", "demo_user"),
limit=100
)
print(f"Total: {len(all_todos['items'])} ToDos")
八、最佳实践
8.1 错误处理
import httpx
try:
run = await client.runs.create(
thread["thread_id"],
"task_maistro",
input={"messages": [HumanMessage("Hello")]},
config=config
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
print("Conflict: Another run is already in progress")
elif e.response.status_code == 404:
print("Not found: Thread or graph doesn't exist")
else:
print(f"HTTP error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
8.2 超时处理
import asyncio
async def run_with_timeout(client, thread_id, assistant_id, input, config, timeout=30):
"""运行图,带超时"""
try:
run = await client.runs.create(thread_id, assistant_id, input=input, config=config)
await asyncio.wait_for(
client.runs.join(thread_id, run["run_id"]),
timeout=timeout
)
return await client.runs.get(thread_id, run["run_id"])
except asyncio.TimeoutError:
print(f"Run timed out after {timeout} seconds")
return None
8.3 重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def create_run_with_retry(client, thread_id, assistant_id, input, config):
"""创建运行,带重试"""
return await client.runs.create(thread_id, assistant_id, input=input, config=config)
8.4 日志记录
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def create_run_with_logging(client, thread_id, assistant_id, input, config):
"""创建运行,带日志"""
logger.info(f"Creating run for thread {thread_id}")
run = await client.runs.create(thread_id, assistant_id, input=input, config=config)
logger.info(f"Run created: {run['run_id']}")
logger.info(f"Status: {run['status']}")
return run
九、常见问题和解决方案
9.1 连接失败
问题:无法连接到 http://localhost:8123
解决方案:
# 1. 检查部署是否运行
$ docker compose ps
# 2. 检查端口映射
$ docker compose ps | grep 8123
# 3. 测试连接
$ curl http://localhost:8123/health
# 4. 查看日志
$ docker compose logs langgraph-api
9.2 Run 一直是 pending 状态
问题:Run 长时间停留在 pending
状态
可能原因:
- Queue Worker 没有运行
- Redis 连接问题
- 任务队列堵塞
解决方案:
# 检查所有服务状态
$ docker compose ps
# 查看 Queue Worker 日志
$ docker compose logs langgraph-api | grep -i "worker"
# 重启服务
$ docker compose restart
9.3 Thread 状态不更新
问题:线程状态看起来没有更新
原因:可能是缓存问题
解决方案:
# 强制刷新状态
thread_state = await client.threads.get_state(
thread['thread_id'],
checkpoint_id=None # 获取最新状态
)
9.4 Store 数据丢失
问题:Store 中的数据不见了
检查清单:
- Namespace 是否正确?
- PostgreSQL 是否正常运行?
- 是否执行了
docker compose down -v
(删除了数据卷)?
解决方案:
# 验证 namespace
items = await client.store.search_items(
("todo", "general", "Test"), # 确保 namespace 正确
limit=100
)
print(f"Found {len(items['items'])} items")
十、性能优化
10.1 批量操作
不好的做法:
# 逐个创建运行
for task in tasks:
run = await client.runs.create(...)
await client.runs.join(...) # 等待每个完成
好的做法:
# 并发创建运行
runs = []
for task in tasks:
run = await client.runs.create(...)
runs.append(run)
# 一次性等待所有完成
await asyncio.gather(*[
client.runs.join(thread_id, run["run_id"])
for run in runs
])
10.2 连接池
import httpx
# 使用连接池
async with httpx.AsyncClient() as http_client:
client = get_client(
url="http://localhost:8123",
http_client=http_client
)
# 使用 client...
10.3 缓存
from functools import lru_cache
from datetime import datetime, timedelta
# 缓存线程状态
_state_cache = {}
_cache_ttl = timedelta(seconds=5)
async def get_thread_state_cached(client, thread_id):
now = datetime.now()
if thread_id in _state_cache:
state, timestamp = _state_cache[thread_id]
if now - timestamp < _cache_ttl:
return state
state = await client.threads.get_state(thread_id)
_state_cache[thread_id] = (state, now)
return state
十一、Python 和异步编程知识总结
11.1 async/await 基础
# 异步函数定义
async def my_async_function():
# 异步操作
result = await some_async_operation()
return result
# 调用异步函数
result = await my_async_function()
# 在 Jupyter 中
# 可以直接使用 await
thread = await client.threads.create()
# 在普通 Python 脚本中
# 需要使用 asyncio.run()
import asyncio
asyncio.run(my_async_function())
11.2 异步迭代
# 异步 for 循环
async for chunk in client.runs.stream(...):
print(chunk)
# 等价于同步版本:
# for item in sync_iterator:
# print(item)
11.3 并发执行
import asyncio
# 方法 1:asyncio.gather(推荐)
results = await asyncio.gather(
client.threads.create(),
client.threads.create(),
client.threads.create()
)
# 返回 [thread1, thread2, thread3]
# 方法 2:asyncio.wait
tasks = [
client.threads.create(),
client.threads.create()
]
done, pending = await asyncio.wait(tasks)
# 方法 3:手动管理
tasks = []
for i in range(10):
task = asyncio.create_task(client.threads.create())
tasks.append(task)
results = await asyncio.gather(*tasks)
11.4 异步上下文管理器
# 异步 with 语句
async with httpx.AsyncClient() as client:
response = await client.get("http://...")
# 等价于:
client = httpx.AsyncClient()
try:
response = await client.get("http://...")
finally:
await client.aclose()
十二、总结
12.1 核心要点
两种连接方式
- LangGraph SDK:全功能,推荐生产环境
- Remote Graph:简化接口,快速原型
三种运行类型
- Background Run:后台执行,不等待
- Blocking Run:等待完成
- Streaming Run:实时流式传输
Threads 的威力
- 多轮对话
- 状态持久化
- 时间旅行
- Human in the loop
Store API
- 跨线程记忆
- search/put/delete 操作
- Namespace 组织
12.2 API 快速参考
# 连接
client = get_client(url="http://localhost:8123")
# Threads
thread = await client.threads.create()
state = await client.threads.get_state(thread_id)
copied = await client.threads.copy(thread_id)
history = await client.threads.get_history(thread_id)
await client.threads.update_state(thread_id, values, checkpoint_id)
# Runs
run = await client.runs.create(thread_id, assistant_id, input, config)
status = await client.runs.get(thread_id, run_id)
await client.runs.join(thread_id, run_id)
async for chunk in client.runs.stream(thread_id, assistant_id, ...):
pass
# Store
items = await client.store.search_items(namespace, limit, offset)
await client.store.put_item(namespace, key, value)
item = await client.store.get_item(namespace, key)
await client.store.delete_item(namespace, key)
12.3 最佳实践清单
- ✅ 使用 LangGraph SDK 而不是直接调用 REST API
- ✅ 为长时间任务使用流式传输
- ✅ 合理使用 threads 进行多轮对话
- ✅ 利用 checkpoints 实现 Human in the loop
- ✅ 使用 store 保存跨线程记忆
- ✅ 实现错误处理和重试机制
- ✅ 记录日志便于调试
- ✅ 使用异步并发提高性能
12.4 下节预告
在 6.3-double-texting 中,我们将学习:
- 什么是 double texting(并发请求)
- Reject 策略:拒绝新请求
- Enqueue 策略:排队等待
- Interrupt 策略:中断当前
- Rollback 策略:回滚重新开始
- 如何选择合适的策略
文档版本:1.0 最后更新:2024-11-05 作者:AI Assistant 基于:LangChain Academy Module-6 Lesson 6.2
恭喜你掌握了 LangGraph Platform 的核心 API!现在你可以自由地连接和使用部署的图了。🎉