Skip to content

PydanticAI 与 LangGraph 协同集成

本章探讨如何让 PydanticAI 和 LangGraph 协同工作,发挥各自优势。

本章概览

本章将介绍:

  • 为什么需要协同集成
  • 四种集成模式
  • 完整的集成示例
  • 最佳实践和注意事项

1. 为什么需要协同

1.1 互补优势

┌─────────────────────────────────────────────────────────────────┐
│                      协同集成价值                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  PydanticAI 贡献:              LangGraph 贡献:                 │
│  ─────────────────             ─────────────────                │
│                                                                  │
│  ✓ 类型安全的输出              ✓ 复杂流程编排                   │
│  ✓ 优雅的依赖注入              ✓ 精确的状态管理                 │
│  ✓ 简洁的 Agent 定义           ✓ 多 Agent 协作                  │
│  ✓ 内置验证和重试              ✓ 检查点和恢复                   │
│  ✓ Logfire 可观测              ✓ 条件路由控制                   │
│                                                                  │
│                       ▼                                          │
│              ┌─────────────────────┐                            │
│              │   协同集成系统       │                            │
│              │                     │                            │
│              │  最佳开发体验       │                            │
│              │  +                  │                            │
│              │  最强编排能力       │                            │
│              └─────────────────────┘                            │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

1.2 典型场景

场景PydanticAI 角色LangGraph 角色
多专家系统定义各专家 Agent编排专家协作流程
RAG 系统处理检索结果管理检索-生成流程
工作流自动化执行具体任务定义工作流状态机
审批系统生成审批意见管理审批流程

2. 四种集成模式

2.1 模式一:PydanticAI Agent 作为 LangGraph 节点

最常用的模式:将 PydanticAI Agent 包装为 LangGraph 节点。

python
from langgraph.graph import StateGraph, START, END
from pydantic_ai import Agent
from pydantic import BaseModel
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

# ========== PydanticAI 定义 ==========
class AnalysisResult(BaseModel):
    summary: str
    sentiment: str
    confidence: float

analyst_agent = Agent(
    'openai:gpt-4o',
    output_type=AnalysisResult,
    instructions='你是一位专业分析师,分析用户提供的内容。'
)

class WritingResult(BaseModel):
    title: str
    content: str

writer_agent = Agent(
    'openai:gpt-4o',
    output_type=WritingResult,
    instructions='你是一位专业作家,根据分析结果撰写文章。'
)

# ========== LangGraph 状态 ==========
class WorkflowState(TypedDict):
    input_text: str
    analysis: AnalysisResult | None
    article: WritingResult | None
    messages: Annotated[list, add_messages]

# ========== 包装为节点 ==========
async def analyze_node(state: WorkflowState) -> dict:
    """分析节点:使用 PydanticAI Agent"""
    result = await analyst_agent.run(state["input_text"])
    return {
        "analysis": result.output,
        "messages": [("assistant", f"分析完成: {result.output.summary}")]
    }

async def write_node(state: WorkflowState) -> dict:
    """写作节点:使用 PydanticAI Agent"""
    prompt = f"""
    基于以下分析撰写文章:
    摘要:{state["analysis"].summary}
    情感:{state["analysis"].sentiment}
    """
    result = await writer_agent.run(prompt)
    return {
        "article": result.output,
        "messages": [("assistant", f"文章完成: {result.output.title}")]
    }

# ========== 构建图 ==========
graph = StateGraph(WorkflowState)
graph.add_node("analyze", analyze_node)
graph.add_node("write", write_node)
graph.add_edge(START, "analyze")
graph.add_edge("analyze", "write")
graph.add_edge("write", END)

workflow = graph.compile()

# ========== 运行 ==========
async def main():
    result = await workflow.ainvoke({
        "input_text": "人工智能正在改变我们的生活...",
        "analysis": None,
        "article": None,
        "messages": []
    })
    print(f"分析: {result['analysis']}")
    print(f"文章: {result['article'].title}")

优势

  • PydanticAI 提供类型安全的输出
  • LangGraph 管理整体流程
  • 两者优势完美结合

2.2 模式二:LangGraph 作为 PydanticAI 工具

反向集成:将 LangGraph 图作为 PydanticAI 的工具。

python
from pydantic_ai import Agent, RunContext
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

# ========== 定义 LangGraph 工作流 ==========
class ResearchState(TypedDict):
    query: str
    results: list[str]

def search_node(state: ResearchState) -> dict:
    # 模拟搜索
    return {"results": [f"Result for: {state['query']}"]}

def summarize_node(state: ResearchState) -> dict:
    summary = "\n".join(state["results"])
    return {"results": [f"Summary: {summary}"]}

research_graph = StateGraph(ResearchState)
research_graph.add_node("search", search_node)
research_graph.add_node("summarize", summarize_node)
research_graph.add_edge(START, "search")
research_graph.add_edge("search", "summarize")
research_graph.add_edge("summarize", END)

research_workflow = research_graph.compile()

# ========== PydanticAI Agent 使用 LangGraph 作为工具 ==========
main_agent = Agent(
    'openai:gpt-4o',
    instructions='你是一个研究助手,可以使用研究工具来回答问题。'
)

@main_agent.tool_plain
async def research(query: str) -> str:
    """
    执行研究任务

    Args:
        query: 研究主题
    """
    result = await research_workflow.ainvoke({"query": query, "results": []})
    return "\n".join(result["results"])

# 使用
async def main():
    result = await main_agent.run("研究一下量子计算的最新进展")
    print(result.output)

2.3 模式三:共享状态集成

深度集成:共享状态和依赖。

python
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Any

# ========== 共享依赖 ==========
@dataclass
class SharedDependencies:
    database: Any  # 数据库连接
    cache: Any     # 缓存
    user_id: int   # 当前用户

# ========== PydanticAI Agents ==========
query_agent = Agent(
    'openai:gpt-4o',
    deps_type=SharedDependencies,
    instructions='你负责理解用户查询意图。'
)

@query_agent.tool
async def get_user_history(ctx: RunContext[SharedDependencies]) -> str:
    """获取用户历史"""
    history = await ctx.deps.database.get_history(ctx.deps.user_id)
    return str(history)

action_agent = Agent(
    'openai:gpt-4o',
    deps_type=SharedDependencies,
    instructions='你负责执行具体操作。'
)

# ========== LangGraph 状态(包含共享依赖) ==========
class IntegratedState(TypedDict):
    query: str
    intent: str
    result: str
    deps: SharedDependencies  # 共享依赖

async def understand_node(state: IntegratedState) -> dict:
    """理解意图"""
    result = await query_agent.run(
        state["query"],
        deps=state["deps"]
    )
    return {"intent": result.output}

async def execute_node(state: IntegratedState) -> dict:
    """执行操作"""
    result = await action_agent.run(
        f"执行: {state['intent']}",
        deps=state["deps"]
    )
    return {"result": result.output}

# 构建集成图
integrated_graph = StateGraph(IntegratedState)
integrated_graph.add_node("understand", understand_node)
integrated_graph.add_node("execute", execute_node)
integrated_graph.add_edge(START, "understand")
integrated_graph.add_edge("understand", "execute")
integrated_graph.add_edge("execute", END)

app = integrated_graph.compile()

2.4 模式四:A2A 协议集成

标准化集成:通过 Agent-to-Agent 协议。

python
from pydantic_ai import Agent
from fasta2a import A2AClient
import httpx

# ========== PydanticAI Agent 作为 A2A 服务 ==========
specialist_agent = Agent(
    'openai:gpt-4o',
    instructions='你是领域专家。'
)

# 转换为 A2A 服务
a2a_app = specialist_agent.to_a2a()

# 运行: uvicorn a2a_server:a2a_app --port 8001

# ========== 在 LangGraph 中调用 A2A 服务 ==========
class A2AState(TypedDict):
    query: str
    expert_response: str

async def call_expert_node(state: A2AState) -> dict:
    """通过 A2A 协议调用专家"""
    async with httpx.AsyncClient() as client:
        a2a_client = A2AClient(
            base_url="http://localhost:8001",
            http_client=client
        )
        result = await a2a_client.send_task(
            message=state["query"]
        )
        return {"expert_response": result.output}

# 构建使用 A2A 的图
a2a_graph = StateGraph(A2AState)
a2a_graph.add_node("call_expert", call_expert_node)
a2a_graph.add_edge(START, "call_expert")
a2a_graph.add_edge("call_expert", END)

3. 完整集成示例:智能客服系统

3.1 系统架构

┌─────────────────────────────────────────────────────────────────┐
│                     智能客服系统架构                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│                       用户请求                                    │
│                          │                                       │
│                          ▼                                       │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │                   LangGraph 编排层                         │  │
│  │                                                           │  │
│  │   ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────┐   │  │
│  │   │ 意图    │───►│ 路由    │───►│ 专家    │───►│ 输出│   │  │
│  │   │ 识别    │    │ 决策    │    │ 处理    │    │ 格式│   │  │
│  │   └─────────┘    └─────────┘    └─────────┘    └─────┘   │  │
│  │        │              │              │                    │  │
│  └────────┼──────────────┼──────────────┼────────────────────┘  │
│           │              │              │                       │
│           ▼              ▼              ▼                       │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │                  PydanticAI Agent 层                       │  │
│  │                                                           │  │
│  │   ┌─────────┐    ┌─────────┐    ┌─────────┐              │  │
│  │   │ 意图    │    │ 订单    │    │ 技术    │    ...       │  │
│  │   │ Agent   │    │ Agent   │    │ Agent   │              │  │
│  │   └─────────┘    └─────────┘    └─────────┘              │  │
│  │                                                           │  │
│  └───────────────────────────────────────────────────────────┘  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

3.2 完整代码

python
"""
智能客服系统 - PydanticAI + LangGraph 协同示例
"""

from dataclasses import dataclass
from typing import TypedDict, Literal, Annotated
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

# ============================================================
# 第一部分:共享依赖和数据模型
# ============================================================

@dataclass
class CustomerServiceDeps:
    """客服系统共享依赖"""
    customer_id: str
    db_connection: object  # 数据库连接
    order_api: object      # 订单 API

    async def get_customer_info(self) -> dict:
        """获取客户信息"""
        # 模拟数据库查询
        return {
            "name": "张三",
            "level": "VIP",
            "history": ["订单查询", "退款申请"]
        }

    async def get_recent_orders(self) -> list:
        """获取最近订单"""
        return [
            {"id": "ORD001", "status": "已发货", "amount": 299},
            {"id": "ORD002", "status": "待付款", "amount": 599},
        ]

# 意图分类结果
class IntentResult(BaseModel):
    intent: Literal["order", "refund", "technical", "general"]
    confidence: float = Field(ge=0, le=1)
    entities: dict = Field(default_factory=dict)

# 订单查询结果
class OrderResponse(BaseModel):
    found: bool
    order_id: str | None = None
    status: str | None = None
    message: str

# 最终响应
class FinalResponse(BaseModel):
    message: str
    suggested_actions: list[str] = Field(default_factory=list)
    satisfaction_prompt: bool = True

# ============================================================
# 第二部分:PydanticAI Agents 定义
# ============================================================

# 意图识别 Agent
intent_agent = Agent(
    'openai:gpt-4o',
    output_type=IntentResult,
    deps_type=CustomerServiceDeps,
    instructions='''
    你是意图识别专家。分析用户消息,识别以下意图之一:
    - order: 订单相关(查询、物流、状态)
    - refund: 退款退货相关
    - technical: 技术问题(产品使用、故障)
    - general: 一般咨询

    返回意图类型、置信度和提取的实体(如订单号)。
    '''
)

@intent_agent.tool
async def get_customer_context(ctx: RunContext[CustomerServiceDeps]) -> str:
    """获取客户上下文信息"""
    info = await ctx.deps.get_customer_info()
    return f"客户: {info['name']}, 等级: {info['level']}, 历史: {info['history']}"

# 订单处理 Agent
order_agent = Agent(
    'openai:gpt-4o',
    output_type=OrderResponse,
    deps_type=CustomerServiceDeps,
    instructions='''
    你是订单处理专家。处理用户的订单查询请求。
    使用工具查询订单信息,给出准确回复。
    '''
)

@order_agent.tool
async def query_orders(ctx: RunContext[CustomerServiceDeps]) -> str:
    """查询客户订单"""
    orders = await ctx.deps.get_recent_orders()
    return str(orders)

@order_agent.tool_plain
def format_order_status(order_id: str, status: str) -> str:
    """格式化订单状态"""
    status_map = {
        "已发货": "您的订单已发货,预计3-5天送达",
        "待付款": "订单待付款,请尽快完成支付",
        "已完成": "订单已完成,感谢您的购买",
    }
    return status_map.get(status, f"订单 {order_id} 状态: {status}")

# 退款处理 Agent
refund_agent = Agent(
    'openai:gpt-4o',
    deps_type=CustomerServiceDeps,
    instructions='''
    你是退款处理专家。帮助用户处理退款退货请求。
    了解退款原因,说明退款政策,引导用户完成退款流程。
    '''
)

# 技术支持 Agent
technical_agent = Agent(
    'openai:gpt-4o',
    instructions='''
    你是技术支持专家。解答产品使用问题和技术故障。
    提供清晰的步骤指导,必要时建议联系专业客服。
    '''
)

# 通用咨询 Agent
general_agent = Agent(
    'openai:gpt-4o',
    instructions='''
    你是通用咨询专家。回答一般性问题,介绍公司政策和服务。
    保持友好专业的态度。
    '''
)

# 响应格式化 Agent
formatter_agent = Agent(
    'openai:gpt-4o',
    output_type=FinalResponse,
    instructions='''
    你是响应格式化专家。将专家的回复整理成用户友好的格式。
    - 保持简洁专业
    - 添加相关的建议操作
    - 适当询问满意度
    '''
)

# ============================================================
# 第三部分:LangGraph 状态和节点
# ============================================================

class CustomerServiceState(TypedDict):
    """客服系统状态"""
    messages: Annotated[list, add_messages]
    user_input: str
    intent: IntentResult | None
    expert_response: str | None
    final_response: FinalResponse | None
    deps: CustomerServiceDeps

async def intent_node(state: CustomerServiceState) -> dict:
    """意图识别节点"""
    result = await intent_agent.run(
        state["user_input"],
        deps=state["deps"]
    )
    return {
        "intent": result.output,
        "messages": [("system", f"识别意图: {result.output.intent}")]
    }

async def order_node(state: CustomerServiceState) -> dict:
    """订单处理节点"""
    result = await order_agent.run(
        state["user_input"],
        deps=state["deps"]
    )
    return {
        "expert_response": result.output.message,
        "messages": [("assistant", result.output.message)]
    }

async def refund_node(state: CustomerServiceState) -> dict:
    """退款处理节点"""
    result = await refund_agent.run(
        state["user_input"],
        deps=state["deps"]
    )
    return {
        "expert_response": result.output,
        "messages": [("assistant", result.output)]
    }

async def technical_node(state: CustomerServiceState) -> dict:
    """技术支持节点"""
    result = await technical_agent.run(state["user_input"])
    return {
        "expert_response": result.output,
        "messages": [("assistant", result.output)]
    }

async def general_node(state: CustomerServiceState) -> dict:
    """通用咨询节点"""
    result = await general_agent.run(state["user_input"])
    return {
        "expert_response": result.output,
        "messages": [("assistant", result.output)]
    }

async def format_node(state: CustomerServiceState) -> dict:
    """响应格式化节点"""
    result = await formatter_agent.run(
        f"请格式化以下回复:\n{state['expert_response']}"
    )
    return {"final_response": result.output}

# 路由函数
def route_by_intent(state: CustomerServiceState) -> str:
    """根据意图路由到专家节点"""
    if state["intent"] is None:
        return "general"

    intent = state["intent"].intent
    route_map = {
        "order": "order",
        "refund": "refund",
        "technical": "technical",
        "general": "general",
    }
    return route_map.get(intent, "general")

# ============================================================
# 第四部分:构建工作流图
# ============================================================

def create_customer_service_graph():
    """创建客服工作流图"""

    graph = StateGraph(CustomerServiceState)

    # 添加节点
    graph.add_node("intent", intent_node)
    graph.add_node("order", order_node)
    graph.add_node("refund", refund_node)
    graph.add_node("technical", technical_node)
    graph.add_node("general", general_node)
    graph.add_node("format", format_node)

    # 添加边
    graph.add_edge(START, "intent")

    # 条件路由
    graph.add_conditional_edges(
        "intent",
        route_by_intent,
        {
            "order": "order",
            "refund": "refund",
            "technical": "technical",
            "general": "general",
        }
    )

    # 所有专家节点都连接到格式化节点
    graph.add_edge("order", "format")
    graph.add_edge("refund", "format")
    graph.add_edge("technical", "format")
    graph.add_edge("general", "format")

    # 格式化后结束
    graph.add_edge("format", END)

    return graph.compile()

# ============================================================
# 第五部分:运行系统
# ============================================================

async def handle_customer_query(
    query: str,
    customer_id: str,
) -> FinalResponse:
    """处理客户查询"""

    # 创建依赖
    deps = CustomerServiceDeps(
        customer_id=customer_id,
        db_connection=None,  # 实际项目中注入真实连接
        order_api=None,
    )

    # 创建工作流
    workflow = create_customer_service_graph()

    # 执行
    result = await workflow.ainvoke({
        "user_input": query,
        "intent": None,
        "expert_response": None,
        "final_response": None,
        "deps": deps,
        "messages": [],
    })

    return result["final_response"]

# 使用示例
async def main():
    # 测试不同类型的查询
    queries = [
        "我想查一下我的订单状态",
        "产品使用不了,一直报错",
        "我想申请退款",
        "你们公司在哪里?",
    ]

    for query in queries:
        print(f"\n用户: {query}")
        response = await handle_customer_query(query, "CUST001")
        print(f"客服: {response.message}")
        print(f"建议: {response.suggested_actions}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

4. 最佳实践

4.1 设计原则

┌─────────────────────────────────────────────────────────────────┐
│                      协同设计原则                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  1. 职责分离                                                     │
│     ─────────                                                    │
│     PydanticAI: 专注单个任务的类型安全执行                       │
│     LangGraph:  专注多任务的流程编排                             │
│                                                                  │
│  2. 状态管理                                                     │
│     ─────────                                                    │
│     • 使用 LangGraph 管理全局状态                                │
│     • 使用 PydanticAI deps 传递任务依赖                          │
│     • 避免状态混乱                                               │
│                                                                  │
│  3. 错误处理                                                     │
│     ─────────                                                    │
│     • PydanticAI 处理验证和重试                                  │
│     • LangGraph 处理流程级错误                                   │
│     • 统一的错误日志                                             │
│                                                                  │
│  4. 可观测性                                                     │
│     ─────────                                                    │
│     • Logfire 追踪 PydanticAI                                    │
│     • LangSmith 追踪 LangGraph                                   │
│     • 或统一使用 OpenTelemetry                                   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

4.2 代码组织

project/
├── agents/                    # PydanticAI Agents
│   ├── __init__.py
│   ├── intent.py             # 意图识别 Agent
│   ├── order.py              # 订单处理 Agent
│   └── support.py            # 技术支持 Agent

├── workflows/                 # LangGraph 工作流
│   ├── __init__.py
│   ├── customer_service.py   # 客服工作流
│   └── approval.py           # 审批工作流

├── shared/                    # 共享资源
│   ├── __init__.py
│   ├── dependencies.py       # 共享依赖
│   ├── models.py             # 共享数据模型
│   └── utils.py              # 工具函数

├── tests/
│   ├── test_agents.py        # Agent 测试
│   └── test_workflows.py     # 工作流测试

└── main.py                   # 入口

4.3 测试策略

python
import pytest
from pydantic_ai.models.test import TestModel

# 单独测试 PydanticAI Agent
@pytest.fixture
def test_intent_agent():
    return Agent(
        TestModel(custom_output_text='{"intent": "order", "confidence": 0.95, "entities": {}}'),
        output_type=IntentResult,
    )

def test_intent_recognition(test_intent_agent):
    result = test_intent_agent.run_sync("查询订单")
    assert result.output.intent == "order"
    assert result.output.confidence > 0.9

# 测试集成工作流
@pytest.fixture
def mock_deps():
    return CustomerServiceDeps(
        customer_id="TEST001",
        db_connection=MockDB(),
        order_api=MockOrderAPI(),
    )

async def test_workflow(mock_deps):
    workflow = create_customer_service_graph()

    # 使用测试模型覆盖
    with intent_agent.override(model=TestModel()):
        result = await workflow.ainvoke({
            "user_input": "查询订单",
            "deps": mock_deps,
            # ...
        })

    assert result["final_response"] is not None

5. 注意事项

5.1 常见问题

问题原因解决方案
依赖传递失败状态类型不匹配确保 deps 类型一致
类型丢失LangGraph 状态是 dict使用 TypedDict 保持类型
异步混乱同步/异步混用统一使用 async
状态污染节点修改共享状态返回新状态,不要修改

5.2 性能优化

python
# 1. 并行执行独立 Agent
import asyncio

async def parallel_experts(state: State) -> dict:
    # 并行调用多个专家
    tasks = [
        expert1.run(state["query"]),
        expert2.run(state["query"]),
        expert3.run(state["query"]),
    ]
    results = await asyncio.gather(*tasks)
    return {"expert_results": results}

# 2. 缓存 Agent 结果
from functools import lru_cache

@lru_cache(maxsize=100)
def cached_analysis(text_hash: str):
    # 缓存分析结果
    pass

# 3. 流式处理
async def streaming_node(state: State) -> dict:
    async with agent.run_stream(state["input"]) as stream:
        chunks = []
        async for chunk in stream.stream_text(delta=True):
            chunks.append(chunk)
            # 可以实时发送到前端
        return {"response": "".join(chunks)}

6. 小结

PydanticAI 和 LangGraph 的协同集成可以:

收益说明
最佳开发体验PydanticAI 的类型安全 + 简洁 API
最强编排能力LangGraph 的流程控制 + 状态管理
灵活架构多种集成模式适应不同场景
可维护性清晰的职责分离

推荐模式

  • 模式一(PydanticAI as Node)最常用
  • 模式三(共享状态)最灵活
  • 根据项目需求选择合适的模式

上一章:与 LangGraph 对比下一章:实践案例

基于 MIT 许可证发布。内容版权归作者所有。