PydanticAI 实践案例
本章概览
本章提供多个完整的实践案例,帮助你:
- 掌握 PydanticAI 的实际应用
- 学习最佳实践和常见模式
- 理解不同场景下的架构选择
案例一:智能文档分析系统
1.1 需求描述
构建一个文档分析系统,能够:
- 提取文档关键信息
- 生成结构化摘要
- 回答关于文档的问题
1.2 完整实现
python
"""
智能文档分析系统
"""
from dataclasses import dataclass
from typing import Literal
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
# ========== 数据模型 ==========
class DocumentMetadata(BaseModel):
"""文档元数据"""
title: str
doc_type: Literal["contract", "report", "email", "article", "other"]
language: str
word_count: int
key_dates: list[str] = Field(default_factory=list)
class DocumentSummary(BaseModel):
"""文档摘要"""
executive_summary: str = Field(description="执行摘要,2-3句话")
main_points: list[str] = Field(description="主要观点,3-5条")
action_items: list[str] = Field(default_factory=list, description="待办事项")
risks_or_concerns: list[str] = Field(default_factory=list, description="风险或关注点")
class QuestionAnswer(BaseModel):
"""问答结果"""
answer: str
confidence: Literal["high", "medium", "low"]
source_quote: str | None = Field(default=None, description="来源引用")
# ========== 依赖 ==========
@dataclass
class DocumentContext:
"""文档上下文"""
content: str
filename: str
chunk_size: int = 4000
def get_chunks(self) -> list[str]:
"""分块处理长文档"""
chunks = []
for i in range(0, len(self.content), self.chunk_size):
chunks.append(self.content[i:i + self.chunk_size])
return chunks
# ========== Agents ==========
# 元数据提取 Agent
metadata_agent = Agent(
'openai:gpt-4o',
output_type=DocumentMetadata,
deps_type=DocumentContext,
instructions='''
你是文档元数据提取专家。分析文档内容,提取:
- 标题(如果没有明确标题,根据内容推断)
- 文档类型
- 语言
- 大致字数
- 关键日期
'''
)
@metadata_agent.tool
def get_document_info(ctx: RunContext[DocumentContext]) -> str:
"""获取文档基本信息"""
return f"文件名: {ctx.deps.filename}\n内容长度: {len(ctx.deps.content)} 字符"
# 摘要生成 Agent
summary_agent = Agent(
'openai:gpt-4o',
output_type=DocumentSummary,
deps_type=DocumentContext,
instructions='''
你是文档摘要专家。生成全面但简洁的摘要:
- 执行摘要:2-3句话概括核心内容
- 主要观点:3-5个要点
- 待办事项:如果有的话
- 风险或关注点:如果有的话
保持客观,不要添加原文没有的信息。
'''
)
@summary_agent.tool
def get_full_content(ctx: RunContext[DocumentContext]) -> str:
"""获取完整文档内容"""
return ctx.deps.content
# 问答 Agent
qa_agent = Agent(
'openai:gpt-4o',
output_type=QuestionAnswer,
deps_type=DocumentContext,
instructions='''
你是文档问答专家。基于文档内容回答问题:
- 只根据文档内容回答
- 如果文档中没有相关信息,明确说明
- 尽可能引用原文
- 给出置信度评估
'''
)
@qa_agent.tool
def search_document(ctx: RunContext[DocumentContext], query: str) -> str:
"""搜索文档内容(模拟)"""
# 实际应用中可以使用向量搜索
content = ctx.deps.content.lower()
if query.lower() in content:
# 返回包含查询词的上下文
idx = content.find(query.lower())
start = max(0, idx - 200)
end = min(len(content), idx + 200)
return ctx.deps.content[start:end]
return "未找到相关内容"
# ========== 主服务类 ==========
class DocumentAnalyzer:
"""文档分析服务"""
async def analyze(self, content: str, filename: str) -> dict:
"""完整分析文档"""
ctx = DocumentContext(content=content, filename=filename)
# 并行执行元数据和摘要提取
import asyncio
metadata_task = metadata_agent.run(
"提取此文档的元数据",
deps=ctx
)
summary_task = summary_agent.run(
"生成此文档的摘要",
deps=ctx
)
metadata_result, summary_result = await asyncio.gather(
metadata_task, summary_task
)
return {
"metadata": metadata_result.output,
"summary": summary_result.output,
}
async def ask(
self,
content: str,
filename: str,
question: str
) -> QuestionAnswer:
"""回答关于文档的问题"""
ctx = DocumentContext(content=content, filename=filename)
result = await qa_agent.run(
f"问题:{question}",
deps=ctx
)
return result.output
# ========== 使用示例 ==========
async def main():
analyzer = DocumentAnalyzer()
# 示例文档
document = """
合同编号:CT-2024-001
签订日期:2024年12月15日
甲方:ABC科技有限公司
乙方:XYZ服务有限公司
第一条 服务内容
乙方为甲方提供云计算基础设施服务,包括:
1. 服务器托管
2. 数据备份
3. 安全监控
第二条 服务期限
本合同有效期为一年,自2025年1月1日起至2025年12月31日止。
第三条 费用
年服务费用:人民币100,000元整。
付款方式:每季度支付25,000元,于每季度首月15日前支付。
第四条 违约责任
任何一方违约,需赔偿对方损失的150%。
(以下签章)
"""
# 分析文档
print("正在分析文档...\n")
analysis = await analyzer.analyze(document, "合同-2024-001.pdf")
print("=== 文档元数据 ===")
print(f"标题: {analysis['metadata'].title}")
print(f"类型: {analysis['metadata'].doc_type}")
print(f"关键日期: {analysis['metadata'].key_dates}")
print("\n=== 文档摘要 ===")
print(f"执行摘要: {analysis['summary'].executive_summary}")
print("主要观点:")
for point in analysis['summary'].main_points:
print(f" • {point}")
# 问答
print("\n=== 问答测试 ===")
answer = await analyzer.ask(
document,
"合同-2024-001.pdf",
"服务费用是多少?如何付款?"
)
print(f"问: 服务费用是多少?如何付款?")
print(f"答: {answer.answer}")
print(f"置信度: {answer.confidence}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())案例二:代码审查助手
2.1 需求描述
构建一个代码审查助手,能够:
- 检测代码问题
- 提供改进建议
- 评估代码质量
2.2 完整实现
python
"""
代码审查助手
"""
from pydantic import BaseModel, Field
from pydantic_ai import Agent
from typing import Literal
from enum import Enum
# ========== 数据模型 ==========
class Severity(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class CodeIssue(BaseModel):
"""代码问题"""
line_number: int | None = None
severity: Severity
category: Literal[
"security", "performance", "maintainability",
"bug", "style", "best_practice"
]
description: str
suggestion: str
code_snippet: str | None = None
class CodeReviewResult(BaseModel):
"""代码审查结果"""
overall_score: int = Field(ge=0, le=100, description="总体评分")
summary: str
issues: list[CodeIssue]
strengths: list[str] = Field(default_factory=list)
improvement_priority: list[str] = Field(default_factory=list)
class RefactoredCode(BaseModel):
"""重构后的代码"""
code: str
changes_made: list[str]
explanation: str
# ========== Agents ==========
# 代码审查 Agent
reviewer_agent = Agent(
'anthropic:claude-sonnet-4-0',
output_type=CodeReviewResult,
instructions='''
你是资深代码审查专家,精通多种编程语言。审查代码时:
1. 安全性检查:
- SQL 注入、XSS、命令注入
- 敏感信息泄露
- 权限控制问题
2. 性能检查:
- 算法复杂度
- 内存泄漏风险
- 不必要的计算
3. 可维护性检查:
- 代码复杂度
- 命名规范
- 模块化程度
4. Bug 检查:
- 边界条件
- 空指针/None 检查
- 类型错误
给出 0-100 的评分,识别问题,也要指出优点。
'''
)
# 代码重构 Agent
refactor_agent = Agent(
'anthropic:claude-sonnet-4-0',
output_type=RefactoredCode,
instructions='''
你是代码重构专家。根据审查意见重构代码:
- 保持功能不变
- 修复所有问题
- 提高代码质量
- 添加必要的注释
解释每个更改的原因。
'''
)
# ========== 服务类 ==========
class CodeReviewer:
"""代码审查服务"""
async def review(self, code: str, language: str = "python") -> CodeReviewResult:
"""审查代码"""
prompt = f"""
请审查以下 {language} 代码:
```{language}
{code}
```
"""
result = await reviewer_agent.run(prompt)
return result.output
async def refactor(
self,
code: str,
review: CodeReviewResult,
language: str = "python"
) -> RefactoredCode:
"""根据审查结果重构代码"""
issues_text = "\n".join([
f"- [{issue.severity.value}] {issue.description}"
for issue in review.issues
])
prompt = f"""
原始代码:
```{language}
{code}
```
需要修复的问题:
{issues_text}
请重构代码,修复所有问题。
"""
result = await refactor_agent.run(prompt)
return result.output
async def full_review(
self,
code: str,
language: str = "python"
) -> dict:
"""完整审查流程"""
# 1. 审查
review = await self.review(code, language)
# 2. 如果有严重问题,自动重构
critical_issues = [
i for i in review.issues
if i.severity in [Severity.CRITICAL, Severity.HIGH]
]
refactored = None
if critical_issues:
refactored = await self.refactor(code, review, language)
return {
"review": review,
"refactored": refactored,
}
# ========== 使用示例 ==========
async def main():
reviewer = CodeReviewer()
# 有问题的代码示例
problematic_code = '''
def get_user(user_id):
# 直接拼接 SQL,有注入风险
query = "SELECT * FROM users WHERE id = " + user_id
result = db.execute(query)
return result
def process_data(data):
result = []
for i in range(len(data)): # 可以用 enumerate
for j in range(len(data)): # O(n^2) 复杂度
if data[i] == data[j]:
result.append(data[i])
return result
def divide(a, b):
return a / b # 没有处理除零
password = "admin123" # 硬编码密码
'''
print("=== 代码审查 ===\n")
result = await reviewer.full_review(problematic_code)
review = result["review"]
print(f"总体评分: {review.overall_score}/100")
print(f"摘要: {review.summary}\n")
print("发现的问题:")
for issue in review.issues:
print(f" [{issue.severity.value.upper()}] {issue.category}")
print(f" {issue.description}")
print(f" 建议: {issue.suggestion}\n")
if review.strengths:
print("代码优点:")
for strength in review.strengths:
print(f" ✓ {strength}")
if result["refactored"]:
print("\n=== 重构后的代码 ===\n")
print(result["refactored"].code)
print("\n更改说明:")
for change in result["refactored"].changes_made:
print(f" • {change}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())案例三:多语言翻译系统
3.1 需求描述
构建一个专业翻译系统,支持:
- 多语言互译
- 保持专业术语一致性
- 提供翻译质量评估
3.2 完整实现
python
"""
多语言翻译系统
"""
from dataclasses import dataclass, field
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from typing import Literal
# ========== 数据模型 ==========
class TranslationResult(BaseModel):
"""翻译结果"""
translated_text: str
source_language: str
target_language: str
glossary_terms: dict[str, str] = Field(
default_factory=dict,
description="使用的专业术语对照"
)
class QualityAssessment(BaseModel):
"""质量评估"""
accuracy_score: int = Field(ge=0, le=100)
fluency_score: int = Field(ge=0, le=100)
terminology_score: int = Field(ge=0, le=100)
overall_score: int = Field(ge=0, le=100)
issues: list[str] = Field(default_factory=list)
suggestions: list[str] = Field(default_factory=list)
# ========== 依赖 ==========
@dataclass
class TranslationContext:
"""翻译上下文"""
domain: Literal["general", "legal", "medical", "technical", "financial"]
glossary: dict[str, str] = field(default_factory=dict) # 术语表
style_guide: str = "" # 风格指南
# ========== Agents ==========
# 翻译 Agent
translator_agent = Agent(
'openai:gpt-4o',
output_type=TranslationResult,
deps_type=TranslationContext,
instructions='''
你是专业翻译专家,精通多种语言。翻译时:
- 保持原文意思准确
- 使用自然流畅的目标语言表达
- 遵循领域术语规范
- 保持文档格式
如果有术语表,必须使用术语表中的翻译。
'''
)
@translator_agent.tool
def get_glossary(ctx: RunContext[TranslationContext]) -> str:
"""获取术语表"""
if not ctx.deps.glossary:
return "无专业术语表"
terms = "\n".join([
f"- {src} -> {tgt}"
for src, tgt in ctx.deps.glossary.items()
])
return f"术语表:\n{terms}"
@translator_agent.tool
def get_style_guide(ctx: RunContext[TranslationContext]) -> str:
"""获取风格指南"""
return ctx.deps.style_guide or "无特定风格要求"
# 质量评估 Agent
qa_agent = Agent(
'openai:gpt-4o',
output_type=QualityAssessment,
instructions='''
你是翻译质量评估专家。评估翻译质量:
1. 准确性 (0-100):原文意思是否正确传达
2. 流畅性 (0-100):目标语言是否自然
3. 术语一致性 (0-100):专业术语是否一致
4. 总体评分 (0-100):综合评估
指出具体问题和改进建议。
'''
)
# ========== 服务类 ==========
class TranslationService:
"""翻译服务"""
# 预定义术语表
DOMAIN_GLOSSARIES = {
"legal": {
"contract": "合同",
"liability": "责任",
"jurisdiction": "管辖权",
"plaintiff": "原告",
"defendant": "被告",
},
"medical": {
"diagnosis": "诊断",
"symptom": "症状",
"prescription": "处方",
"chronic": "慢性的",
"acute": "急性的",
},
"technical": {
"algorithm": "算法",
"database": "数据库",
"API": "应用程序接口",
"latency": "延迟",
"throughput": "吞吐量",
},
"financial": {
"asset": "资产",
"liability": "负债",
"equity": "权益",
"dividend": "股息",
"portfolio": "投资组合",
},
}
async def translate(
self,
text: str,
source_lang: str,
target_lang: str,
domain: str = "general",
custom_glossary: dict | None = None,
) -> TranslationResult:
"""翻译文本"""
# 合并术语表
glossary = self.DOMAIN_GLOSSARIES.get(domain, {}).copy()
if custom_glossary:
glossary.update(custom_glossary)
ctx = TranslationContext(
domain=domain,
glossary=glossary,
)
prompt = f"""
将以下{source_lang}文本翻译成{target_lang}:
{text}
"""
result = await translator_agent.run(prompt, deps=ctx)
return result.output
async def assess_quality(
self,
source_text: str,
translated_text: str,
source_lang: str,
target_lang: str,
) -> QualityAssessment:
"""评估翻译质量"""
prompt = f"""
评估以下翻译的质量:
原文({source_lang}):
{source_text}
译文({target_lang}):
{translated_text}
"""
result = await qa_agent.run(prompt)
return result.output
async def translate_with_qa(
self,
text: str,
source_lang: str,
target_lang: str,
domain: str = "general",
) -> dict:
"""翻译并评估质量"""
# 翻译
translation = await self.translate(
text, source_lang, target_lang, domain
)
# 质量评估
assessment = await self.assess_quality(
text,
translation.translated_text,
source_lang,
target_lang,
)
return {
"translation": translation,
"quality": assessment,
}
# ========== 使用示例 ==========
async def main():
service = TranslationService()
# 法律文本翻译
legal_text = """
The plaintiff alleges that the defendant breached the contract
by failing to deliver the goods within the specified jurisdiction.
The plaintiff seeks damages for the defendant's liability.
"""
print("=== 法律文本翻译 ===\n")
print(f"原文:\n{legal_text}")
result = await service.translate_with_qa(
legal_text,
source_lang="英语",
target_lang="中文",
domain="legal"
)
print(f"\n译文:\n{result['translation'].translated_text}")
print(f"\n使用的术语:\n{result['translation'].glossary_terms}")
qa = result["quality"]
print(f"\n质量评估:")
print(f" 准确性: {qa.accuracy_score}/100")
print(f" 流畅性: {qa.fluency_score}/100")
print(f" 术语一致性: {qa.terminology_score}/100")
print(f" 总体评分: {qa.overall_score}/100")
if qa.issues:
print(f"\n问题:")
for issue in qa.issues:
print(f" - {issue}")
if qa.suggestions:
print(f"\n建议:")
for suggestion in qa.suggestions:
print(f" - {suggestion}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())案例四:API 服务集成
4.1 需求描述
将 PydanticAI Agent 部署为 RESTful API 服务。
4.2 FastAPI 集成
python
"""
PydanticAI + FastAPI 集成示例
"""
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, Field
from pydantic_ai import Agent
from typing import Literal
from contextlib import asynccontextmanager
import logfire
# ========== 配置 Logfire ==========
logfire.configure()
logfire.instrument_pydantic_ai()
# ========== 数据模型 ==========
class ChatRequest(BaseModel):
"""聊天请求"""
message: str
conversation_id: str | None = None
class ChatResponse(BaseModel):
"""聊天响应"""
response: str
conversation_id: str
class SentimentRequest(BaseModel):
"""情感分析请求"""
text: str
class SentimentResult(BaseModel):
"""情感分析结果"""
sentiment: Literal["positive", "negative", "neutral"]
confidence: float = Field(ge=0, le=1)
keywords: list[str]
# ========== Agents ==========
chat_agent = Agent(
'openai:gpt-4o',
instructions='你是一个友好的助手,使用中文回答问题。保持回复简洁。'
)
sentiment_agent = Agent(
'openai:gpt-4o',
output_type=SentimentResult,
instructions='分析文本情感,提取关键词。'
)
# ========== 对话存储 ==========
conversations: dict[str, list] = {}
# ========== FastAPI 应用 ==========
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期"""
print("🚀 Starting PydanticAI API Service")
yield
print("👋 Shutting down...")
app = FastAPI(
title="PydanticAI API Service",
description="AI-powered API using PydanticAI",
version="1.0.0",
lifespan=lifespan,
)
# ========== 路由 ==========
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
聊天接口
支持多轮对话,通过 conversation_id 保持上下文。
"""
import uuid
# 获取或创建对话
conv_id = request.conversation_id or str(uuid.uuid4())
message_history = conversations.get(conv_id, [])
try:
result = await chat_agent.run(
request.message,
message_history=message_history if message_history else None,
)
# 保存对话历史
conversations[conv_id] = result.all_messages()
return ChatResponse(
response=result.output,
conversation_id=conv_id,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze/sentiment", response_model=SentimentResult)
async def analyze_sentiment(request: SentimentRequest):
"""
情感分析接口
分析文本情感并提取关键词。
"""
try:
result = await sentiment_agent.run(request.text)
return result.output
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str):
"""删除对话历史"""
if conversation_id in conversations:
del conversations[conversation_id]
return {"status": "deleted"}
raise HTTPException(status_code=404, detail="Conversation not found")
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy", "agents": ["chat", "sentiment"]}
# ========== 流式响应 ==========
from fastapi.responses import StreamingResponse
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""
流式聊天接口
实时返回生成的文本。
"""
async def generate():
try:
async with chat_agent.run_stream(request.message) as stream:
async for chunk in stream.stream_text(delta=True):
yield chunk
except Exception as e:
yield f"\n[Error: {str(e)}]"
return StreamingResponse(
generate(),
media_type="text/plain",
)
# ========== 运行 ==========
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
# 使用示例:
# curl -X POST http://localhost:8000/chat \
# -H "Content-Type: application/json" \
# -d '{"message": "你好!"}'
#
# curl -X POST http://localhost:8000/analyze/sentiment \
# -H "Content-Type: application/json" \
# -d '{"text": "这个产品太棒了,我非常喜欢!"}'案例五:测试驱动开发
5.1 测试策略
python
"""
PydanticAI 测试示例
"""
import pytest
from pydantic_ai import Agent
from pydantic_ai.models.test import TestModel
from pydantic import BaseModel
# ========== 被测代码 ==========
class AnalysisOutput(BaseModel):
category: str
score: float
analysis_agent = Agent(
'openai:gpt-4o',
output_type=AnalysisOutput,
)
# ========== 测试代码 ==========
class TestAnalysisAgent:
"""分析 Agent 测试"""
@pytest.fixture
def test_agent(self):
"""创建测试用 Agent"""
return Agent(
TestModel(
custom_output_text='{"category": "tech", "score": 0.95}'
),
output_type=AnalysisOutput,
)
def test_basic_analysis(self, test_agent):
"""测试基本分析功能"""
result = test_agent.run_sync("分析这段文本")
assert result.output.category == "tech"
assert result.output.score == 0.95
def test_output_type(self, test_agent):
"""测试输出类型"""
result = test_agent.run_sync("测试")
assert isinstance(result.output, AnalysisOutput)
assert isinstance(result.output.category, str)
assert isinstance(result.output.score, float)
def test_override_model(self):
"""测试模型覆盖"""
with analysis_agent.override(model=TestModel()):
result = analysis_agent.run_sync("测试")
assert result.output is not None
class TestWithMockDeps:
"""带依赖的测试"""
@pytest.fixture
def mock_deps(self):
"""模拟依赖"""
from dataclasses import dataclass
@dataclass
class MockDeps:
user_id: int = 1
api_key: str = "test-key"
return MockDeps()
def test_with_deps(self, mock_deps):
"""测试依赖注入"""
from dataclasses import dataclass
from pydantic_ai import RunContext
@dataclass
class Deps:
user_id: int
api_key: str
agent = Agent(
TestModel(),
deps_type=Deps,
)
@agent.tool
def get_user_id(ctx: RunContext[Deps]) -> int:
return ctx.deps.user_id
result = agent.run_sync("获取用户ID", deps=mock_deps)
assert result.output is not None
# ========== 集成测试 ==========
@pytest.mark.integration
class TestIntegration:
"""集成测试(需要真实 API)"""
@pytest.fixture
def real_agent(self):
"""使用真实模型的 Agent"""
return Agent(
'openai:gpt-4o-mini', # 使用便宜的模型
output_type=AnalysisOutput,
)
@pytest.mark.skipif(
not os.getenv("OPENAI_API_KEY"),
reason="需要 OPENAI_API_KEY"
)
def test_real_api(self, real_agent):
"""测试真实 API 调用"""
result = real_agent.run_sync("这是一段关于人工智能的技术文章")
assert result.output.category is not None
assert 0 <= result.output.score <= 1
# ========== 运行测试 ==========
# pytest test_agents.py -v
# pytest test_agents.py -v -m "not integration" # 跳过集成测试5. 小结
本章通过五个完整案例展示了 PydanticAI 的实际应用:
| 案例 | 核心特性 | 应用场景 |
|---|---|---|
| 文档分析 | 结构化输出、依赖注入 | 企业文档处理 |
| 代码审查 | 多 Agent、类型安全 | 开发工具 |
| 翻译系统 | 领域知识、质量评估 | 国际化 |
| API 服务 | FastAPI 集成、流式 | 后端服务 |
| 测试驱动 | TestModel、Mock | 质量保证 |
关键收获:
- 结构化输出让数据处理更可靠
- 依赖注入让代码更模块化
- 多 Agent 协作解决复杂问题
- 测试模型简化测试流程