Skip to content

7.3 实战:多 API 集成系统

🎯 小白理解:为什么需要"多 API 集成"?

现实中,你可能需要同时使用多个 AI 服务:

场景解决方案
OpenAI 太贵有些任务用便宜的模型
OpenAI 挂了自动切换到 Anthropic
需要对比同时问多个 AI,看谁答得好

本节的架构

                ┌─────────────────────────────┐
                │     MultiLLMManager         │
                │   (多 API 管理器)            │
                └──────────┬──────────────────┘

       ┌───────────────────┼───────────────────┐
       │                   │                   │
       ▼                   ▼                   ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ OpenAI      │    │ Anthropic   │    │ Google      │
│ Client      │    │ Client      │    │ Client      │
└─────────────┘    └─────────────┘    └─────────────┘

核心思想:统一接口 + 智能路由 + 自动降级

统一 API 接口设计

🎯 小白理解:什么是"统一接口"?

不同的 AI 公司,API 格式都不一样:

  • OpenAIresponse["choices"][0]["message"]["content"]
  • Anthropicresponse["content"][0]["text"]

每次换 API 都要改代码?太麻烦!

统一接口的做法:

python
# 不管用哪个 API,调用方式都一样
response = client.chat_completion(messages)
print(response.content)  # 统一的格式!

这就是"面向接口编程"——把不同的实现藏在统一的接口后面。

python
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum

class APIProvider(Enum):
    """API 提供商"""
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    GOOGLE = "google"
    DEEPSEEK = "deepseek"

@dataclass
class Message:
    """统一消息格式"""
    role: str  # system, user, assistant
    content: str

@dataclass
class CompletionResponse:
    """统一响应格式"""
    content: str
    model: str
    provider: APIProvider
    usage: Dict[str, int]
    raw_response: Dict[str, Any]

class BaseLLMClient(ABC):
    """LLM 客户端基类"""
    
    def __init__(self, api_key: str, model: str):
        self.api_key = api_key
        self.model = model
    
    @abstractmethod
    def chat_completion(
        self,
        messages: List[Message],
        temperature: float = 0.7,
        max_tokens: int = 2000
    ) -> CompletionResponse:
        """聊天补全接口"""
        pass
    
    @abstractmethod
    async def achat_completion(
        self,
        messages: List[Message],
        temperature: float = 0.7,
        max_tokens: int = 2000
    ) -> CompletionResponse:
        """异步聊天补全接口"""
        pass

OpenAI 客户端实现

python
import requests
import httpx
from typing import List, Dict, Any

class OpenAIClient(BaseLLMClient):
    """OpenAI API 客户端"""
    
    def __init__(self, api_key: str, model: str = "gpt-4"):
        super().__init__(api_key, model)
        self.base_url = "https://api.openai.com/v1"
    
    def _headers(self) -> Dict[str, str]:
        return {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
    
    def _convert_messages(self, messages: List[Message]) -> List[Dict[str, str]]:
        """转换为 OpenAI 格式"""
        return [
            {"role": msg.role, "content": msg.content}
            for msg in messages
        ]
    
    def chat_completion(
        self,
        messages: List[Message],
        temperature: float = 0.7,
        max_tokens: int = 2000
    ) -> CompletionResponse:
        """同步调用"""
        url = f"{self.base_url}/chat/completions"
        
        payload = {
            "model": self.model,
            "messages": self._convert_messages(messages),
            "temperature": temperature,
            "max_tokens": max_tokens
        }
        
        response = requests.post(
            url,
            headers=self._headers(),
            json=payload,
            timeout=30
        )
        
        response.raise_for_status()
        data = response.json()
        
        return CompletionResponse(
            content=data["choices"][0]["message"]["content"],
            model=data["model"],
            provider=APIProvider.OPENAI,
            usage=data["usage"],
            raw_response=data
        )
    
    async def achat_completion(
        self,
        messages: List[Message],
        temperature: float = 0.7,
        max_tokens: int = 2000
    ) -> CompletionResponse:
        """异步调用"""
        url = f"{self.base_url}/chat/completions"
        
        payload = {
            "model": self.model,
            "messages": self._convert_messages(messages),
            "temperature": temperature,
            "max_tokens": max_tokens
        }
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                url,
                headers=self._headers(),
                json=payload,
                timeout=30
            )
            
            response.raise_for_status()
            data = response.json()
            
            return CompletionResponse(
                content=data["choices"][0]["message"]["content"],
                model=data["model"],
                provider=APIProvider.OPENAI,
                usage=data["usage"],
                raw_response=data
            )

Anthropic 客户端实现

python
class AnthropicClient(BaseLLMClient):
    """Anthropic API 客户端"""
    
    def __init__(self, api_key: str, model: str = "claude-3-5-sonnet-20241022"):
        super().__init__(api_key, model)
        self.base_url = "https://api.anthropic.com/v1"
    
    def _headers(self) -> Dict[str, str]:
        return {
            "x-api-key": self.api_key,
            "anthropic-version": "2023-06-01",
            "Content-Type": "application/json"
        }
    
    def _convert_messages(self, messages: List[Message]) -> tuple:
        """转换为 Anthropic 格式(分离 system 和 messages)"""
        system = None
        converted_messages = []
        
        for msg in messages:
            if msg.role == "system":
                system = msg.content
            else:
                converted_messages.append({
                    "role": msg.role,
                    "content": msg.content
                })
        
        return system, converted_messages
    
    def chat_completion(
        self,
        messages: List[Message],
        temperature: float = 0.7,
        max_tokens: int = 2000
    ) -> CompletionResponse:
        """同步调用"""
        url = f"{self.base_url}/messages"
        
        system, converted_messages = self._convert_messages(messages)
        
        payload = {
            "model": self.model,
            "messages": converted_messages,
            "max_tokens": max_tokens,
            "temperature": temperature
        }
        
        if system:
            payload["system"] = system
        
        response = requests.post(
            url,
            headers=self._headers(),
            json=payload,
            timeout=30
        )
        
        response.raise_for_status()
        data = response.json()
        
        return CompletionResponse(
            content=data["content"][0]["text"],
            model=data["model"],
            provider=APIProvider.ANTHROPIC,
            usage={
                "prompt_tokens": data["usage"]["input_tokens"],
                "completion_tokens": data["usage"]["output_tokens"],
                "total_tokens": data["usage"]["input_tokens"] + data["usage"]["output_tokens"]
            },
            raw_response=data
        )
    
    async def achat_completion(
        self,
        messages: List[Message],
        temperature: float = 0.7,
        max_tokens: int = 2000
    ) -> CompletionResponse:
        """异步调用"""
        url = f"{self.base_url}/messages"
        
        system, converted_messages = self._convert_messages(messages)
        
        payload = {
            "model": self.model,
            "messages": converted_messages,
            "max_tokens": max_tokens,
            "temperature": temperature
        }
        
        if system:
            payload["system"] = system
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                url,
                headers=self._headers(),
                json=payload,
                timeout=30
            )
            
            response.raise_for_status()
            data = response.json()
            
            return CompletionResponse(
                content=data["content"][0]["text"],
                model=data["model"],
                provider=APIProvider.ANTHROPIC,
                usage={
                    "prompt_tokens": data["usage"]["input_tokens"],
                    "completion_tokens": data["usage"]["output_tokens"],
                    "total_tokens": data["usage"]["input_tokens"] + data["usage"]["output_tokens"]
                },
                raw_response=data
            )

多 API 管理器

python
import os
from typing import Optional, Dict, List
import asyncio

class MultiLLMManager:
    """多 LLM 管理器"""
    
    def __init__(self):
        self.clients: Dict[APIProvider, BaseLLMClient] = {}
        self._initialize_clients()
    
    def _initialize_clients(self):
        """初始化所有可用的客户端"""
        # OpenAI
        if openai_key := os.getenv("OPENAI_API_KEY"):
            self.clients[APIProvider.OPENAI] = OpenAIClient(
                api_key=openai_key,
                model=os.getenv("OPENAI_MODEL", "gpt-4")
            )
        
        # Anthropic
        if anthropic_key := os.getenv("ANTHROPIC_API_KEY"):
            self.clients[APIProvider.ANTHROPIC] = AnthropicClient(
                api_key=anthropic_key,
                model=os.getenv("ANTHROPIC_MODEL", "claude-3-5-sonnet-20241022")
            )
    
    def get_client(self, provider: APIProvider) -> Optional[BaseLLMClient]:
        """获取指定的客户端"""
        return self.clients.get(provider)
    
    def list_providers(self) -> List[APIProvider]:
        """列出所有可用的提供商"""
        return list(self.clients.keys())
    
    def chat_completion(
        self,
        messages: List[Message],
        provider: APIProvider = APIProvider.OPENAI,
        temperature: float = 0.7,
        max_tokens: int = 2000
    ) -> CompletionResponse:
        """使用指定提供商完成聊天"""
        client = self.get_client(provider)
        
        if not client:
            raise ValueError(f"提供商 {provider.value} 未配置")
        
        return client.chat_completion(messages, temperature, max_tokens)
    
    async def achat_completion(
        self,
        messages: List[Message],
        provider: APIProvider = APIProvider.OPENAI,
        temperature: float = 0.7,
        max_tokens: int = 2000
    ) -> CompletionResponse:
        """异步聊天补全"""
        client = self.get_client(provider)
        
        if not client:
            raise ValueError(f"提供商 {provider.value} 未配置")
        
        return await client.achat_completion(messages, temperature, max_tokens)
    
    async def parallel_completion(
        self,
        messages: List[Message],
        providers: Optional[List[APIProvider]] = None,
        temperature: float = 0.7,
        max_tokens: int = 2000
    ) -> Dict[APIProvider, CompletionResponse]:
        """并发调用多个提供商"""
        if providers is None:
            providers = self.list_providers()
        
        tasks = []
        valid_providers = []
        
        for provider in providers:
            if provider in self.clients:
                tasks.append(
                    self.achat_completion(messages, provider, temperature, max_tokens)
                )
                valid_providers.append(provider)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return {
            provider: result
            for provider, result in zip(valid_providers, results)
            if not isinstance(result, Exception)
        }

# 使用示例
manager = MultiLLMManager()

# 列出可用提供商
print(f"可用提供商: {[p.value for p in manager.list_providers()]}")

# 使用 OpenAI
messages = [
    Message(role="system", content="你是一个有用的助手。"),
    Message(role="user", content="解释什么是 API")
]

try:
    response = manager.chat_completion(messages, provider=APIProvider.OPENAI)
    print(f"\n{response.provider.value} 响应:")
    print(response.content)
    print(f"Token 使用: {response.usage}")

except Exception as e:
    print(f"错误: {e}")

# 异步并发调用
async def demo_parallel():
    responses = await manager.parallel_completion(messages)
    
    for provider, response in responses.items():
        print(f"\n{provider.value} 响应:")
        print(response.content[:200] + "...")
        print(f"Token: {response.usage}")

# asyncio.run(demo_parallel())

智能路由系统

🎯 小白理解:什么是"智能路由"?

想象你是外卖平台,要把订单分配给骑手:

  • 笨方法:永远只用第一个骑手(可能他已经送不过来了)
  • 智能方法:看谁离得近、谁评分高、谁还有空

智能路由做了什么?

1. 统计每个 API 的表现(成功率、速度、成本)
2. 自动选择最优的 API
3. 如果失败了,自动切换到备用 API(降级)

关键指标

指标含义
错误率调用失败的比例
平均延迟响应速度
总 Token消耗了多少资源

通过这些指标,自动选择"又快又稳"的 API!

python
from typing import Optional, List
import time

class SmartRouter:
    """智能 API 路由器"""
    
    def __init__(self, manager: MultiLLMManager):
        self.manager = manager
        self.usage_stats: Dict[APIProvider, Dict[str, Any]] = {
            provider: {
                "calls": 0,
                "errors": 0,
                "total_tokens": 0,
                "avg_latency": 0.0
            }
            for provider in APIProvider
        }
    
    def _update_stats(
        self,
        provider: APIProvider,
        success: bool,
        tokens: int = 0,
        latency: float = 0.0
    ):
        """更新统计信息"""
        stats = self.usage_stats[provider]
        stats["calls"] += 1
        
        if not success:
            stats["errors"] += 1
        else:
            stats["total_tokens"] += tokens
            # 计算移动平均延迟
            if stats["avg_latency"] == 0:
                stats["avg_latency"] = latency
            else:
                stats["avg_latency"] = (stats["avg_latency"] * 0.9 + latency * 0.1)
    
    def select_provider(
        self,
        preferred: Optional[APIProvider] = None,
        fallback: bool = True
    ) -> APIProvider:
        """智能选择提供商"""
        available_providers = self.manager.list_providers()
        
        if not available_providers:
            raise ValueError("没有可用的 API 提供商")
        
        # 如果指定了首选提供商且可用
        if preferred and preferred in available_providers:
            return preferred
        
        # 否则选择错误率最低的提供商
        best_provider = None
        best_score = float('inf')
        
        for provider in available_providers:
            stats = self.usage_stats[provider]
            
            if stats["calls"] == 0:
                # 新提供商,优先尝试
                return provider
            
            # 计算分数(错误率 + 延迟)
            error_rate = stats["errors"] / stats["calls"]
            score = error_rate * 100 + stats["avg_latency"]
            
            if score < best_score:
                best_score = score
                best_provider = provider
        
        return best_provider or available_providers[0]
    
    async def chat_completion_with_fallback(
        self,
        messages: List[Message],
        preferred: Optional[APIProvider] = None,
        temperature: float = 0.7,
        max_tokens: int = 2000
    ) -> CompletionResponse:
        """带降级的聊天补全"""
        providers = self.manager.list_providers()
        
        # 按优先级排序
        if preferred and preferred in providers:
            providers.remove(preferred)
            providers.insert(0, preferred)
        
        last_error = None
        
        for provider in providers:
            try:
                start = time.time()
                
                response = await self.manager.achat_completion(
                    messages,
                    provider=provider,
                    temperature=temperature,
                    max_tokens=max_tokens
                )
                
                latency = time.time() - start
                
                # 更新统计
                self._update_stats(
                    provider,
                    success=True,
                    tokens=response.usage.get("total_tokens", 0),
                    latency=latency
                )
                
                return response
            
            except Exception as e:
                print(f"{provider.value} 失败: {e},尝试降级...")
                self._update_stats(provider, success=False)
                last_error = e
                continue
        
        # 所有提供商都失败
        raise Exception(f"所有 API 提供商都失败了。最后错误: {last_error}")
    
    def get_stats(self) -> Dict[str, Any]:
        """获取统计报告"""
        return {
            provider.value: stats
            for provider, stats in self.usage_stats.items()
            if stats["calls"] > 0
        }

# 使用示例
async def demo_smart_router():
    manager = MultiLLMManager()
    router = SmartRouter(manager)
    
    messages = [
        Message(role="user", content="你好,介绍一下自己")
    ]
    
    # 使用智能路由
    for i in range(5):
        try:
            response = await router.chat_completion_with_fallback(
                messages,
                preferred=APIProvider.OPENAI
            )
            
            print(f"\n请求 {i+1}:")
            print(f"提供商: {response.provider.value}")
            print(f"响应: {response.content[:100]}...")
        
        except Exception as e:
            print(f"请求失败: {e}")
    
    # 打印统计
    print("\n=== 统计报告 ===")
    for provider, stats in router.get_stats().items():
        print(f"\n{provider}:")
        print(f"  调用次数: {stats['calls']}")
        print(f"  错误次数: {stats['errors']}")
        print(f"  平均延迟: {stats['avg_latency']:.2f}秒")
        print(f"  总 Token: {stats['total_tokens']}")

# asyncio.run(demo_smart_router())

成本管理

🎯 小白理解:为什么要管理 API 成本?

AI API 是按量付费的,用多少付多少:

模型输入价格 (每1000 tokens)输出价格
GPT-4$0.03$0.06
GPT-3.5$0.001$0.002
Claude 3.5$0.003$0.015

不管理成本会发生什么?

一个 bug 导致无限循环调用 API

一晚上花掉几百美元

收到信用卡账单时崩溃

成本追踪器能做什么?

  1. 记录每次调用的花费
  2. 统计每日/每月总成本
  3. 设置预算上限,超了就报警
python
from dataclasses import dataclass
from typing import Dict
from datetime import datetime, timedelta

@dataclass
class PricingConfig:
    """定价配置"""
    input_price_per_1k: float  # 输入 token 价格(每 1000 tokens)
    output_price_per_1k: float  # 输出 token 价格

# 各提供商定价(示例)
PRICING = {
    APIProvider.OPENAI: {
        "gpt-4": PricingConfig(input_price_per_1k=0.03, output_price_per_1k=0.06),
        "gpt-3.5-turbo": PricingConfig(input_price_per_1k=0.001, output_price_per_1k=0.002),
    },
    APIProvider.ANTHROPIC: {
        "claude-3-5-sonnet-20241022": PricingConfig(input_price_per_1k=0.003, output_price_per_1k=0.015),
    }
}

class CostTracker:
    """成本追踪器"""
    
    def __init__(self):
        self.daily_costs: Dict[str, float] = {}
        self.monthly_limit: float = 100.0  # 每月预算(美元)
    
    def calculate_cost(
        self,
        provider: APIProvider,
        model: str,
        input_tokens: int,
        output_tokens: int
    ) -> float:
        """计算单次调用成本"""
        pricing = PRICING.get(provider, {}).get(model)
        
        if not pricing:
            return 0.0
        
        input_cost = (input_tokens / 1000) * pricing.input_price_per_1k
        output_cost = (output_tokens / 1000) * pricing.output_price_per_1k
        
        return input_cost + output_cost
    
    def track_usage(
        self,
        provider: APIProvider,
        model: str,
        usage: Dict[str, int]
    ) -> float:
        """追踪使用情况"""
        cost = self.calculate_cost(
            provider,
            model,
            usage.get("prompt_tokens", 0),
            usage.get("completion_tokens", 0)
        )
        
        today = datetime.now().strftime("%Y-%m-%d")
        self.daily_costs[today] = self.daily_costs.get(today, 0.0) + cost
        
        return cost
    
    def get_monthly_cost(self) -> float:
        """获取本月总成本"""
        now = datetime.now()
        month_start = now.replace(day=1).strftime("%Y-%m-%d")
        
        return sum(
            cost for date, cost in self.daily_costs.items()
            if date >= month_start
        )
    
    def check_budget(self) -> Dict[str, Any]:
        """检查预算状态"""
        monthly_cost = self.get_monthly_cost()
        remaining = self.monthly_limit - monthly_cost
        usage_percent = (monthly_cost / self.monthly_limit) * 100
        
        return {
            "monthly_cost": monthly_cost,
            "monthly_limit": self.monthly_limit,
            "remaining": remaining,
            "usage_percent": usage_percent,
            "alert": usage_percent > 80
        }

# 使用示例
tracker = CostTracker()

# 追踪一次调用
usage = {"prompt_tokens": 100, "completion_tokens": 500}
cost = tracker.track_usage(APIProvider.OPENAI, "gpt-4", usage)
print(f"本次调用成本: ${cost:.4f}")

# 检查预算
budget_status = tracker.check_budget()
print(f"\n本月成本: ${budget_status['monthly_cost']:.2f}")
print(f"预算剩余: ${budget_status['remaining']:.2f}")
print(f"使用率: {budget_status['usage_percent']:.1f}%")

if budget_status['alert']:
    print("⚠️ 警告: 预算使用已超过 80%!")

关键要点

  1. 统一接口:使用抽象基类统一不同提供商的 API
  2. 智能路由:根据性能和可用性自动选择最佳提供商
  3. 降级策略:主提供商失败时自动切换到备用提供商
  4. 成本管理:追踪和控制 API 使用成本
  5. 统计监控:跟踪调用成功率、延迟等关键指标

下一节:7.4 小结和复习

基于 MIT 许可证发布。内容版权归作者所有。