7.2 HTTP 请求与 API 调用
🎯 小白理解:什么是 HTTP 请求和 API?
想象你去餐厅吃饭:
你(客户端) 服务员(API) 厨房(服务器) │ │ │ │──── 点菜单 ────────→│ │ │ (HTTP 请求) │──── 转达订单 ────→│ │ │ │ │ │←──── 做好的菜 ─────│ │←──── 上菜 ─────────│ │ (HTTP 响应)关键概念:
概念 餐厅比喻 编程解释 API 服务员 程序之间通信的接口 HTTP 请求 你的点菜单 你发给服务器的指令 HTTP 响应 上的菜 服务器返回的数据 GET 请求 问"有什么菜?" 获取数据 POST 请求 "我要点这些菜" 发送数据 OpenAI API 就是这样:你发请求说"请回答这个问题",它返回 AI 的回答。
requests 库基础
🎯 小白理解:
requests是 Python 最流行的 HTTP 库,用来"发送请求、接收响应"。
安装
bash
pip install requests基本请求
🎯 小白理解:HTTP 请求的四种常用方法
方法 用途 类比 GET获取数据 查看菜单 POST发送数据 下订单 PUT更新数据 修改订单 DELETE删除数据 取消订单
python
import requests
from typing import Dict, Any
# GET 请求
response = requests.get("https://api.github.com/users/python")
print(response.status_code) # 200
print(response.json()) # 解析 JSON 响应
# POST 请求
payload = {"name": "Agent", "status": "active"}
response = requests.post(
"https://api.example.com/agents",
json=payload,
headers={"Content-Type": "application/json"}
)
# 带参数的 GET 请求
params = {"q": "python", "per_page": 10}
response = requests.get("https://api.github.com/search/repositories", params=params)
# 带认证的请求
headers = {"Authorization": "Bearer YOUR_TOKEN"}
response = requests.get("https://api.example.com/data", headers=headers)OpenAI API 调用
🎯 小白理解:调用 OpenAI API 的流程
1. 准备请求 ├── API Key(身份证明) ├── 模型名称(gpt-4) └── 消息内容(你的问题) 2. 发送 POST 请求到 OpenAI 服务器 3. 接收响应 └── AI 的回答在 response["choices"][0]["message"]["content"]消息格式:
pythonmessages = [ {"role": "system", "content": "你是一个助手"}, # 系统设定 {"role": "user", "content": "你好"}, # 用户说的话 {"role": "assistant", "content": "你好!"} # AI 的回复 ]这就是 ChatGPT 的工作原理!
python
import os
import requests
from typing import List, Dict, Any, Optional
class OpenAIClient:
"""OpenAI API 客户端"""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
self.base_url = "https://api.openai.com/v1"
if not self.api_key:
raise ValueError("API Key 未配置")
def _headers(self) -> Dict[str, str]:
"""生成请求头"""
return {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
def chat_completion(
self,
messages: List[Dict[str, str]],
model: str = "gpt-4",
temperature: float = 0.7,
max_tokens: int = 2000
) -> Dict[str, Any]:
"""调用 Chat Completion API"""
url = f"{self.base_url}/chat/completions"
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
response = requests.post(
url,
headers=self._headers(),
json=payload,
timeout=30
)
response.raise_for_status() # 抛出 HTTP 错误
return response.json()
def get_models(self) -> List[str]:
"""获取可用模型列表"""
url = f"{self.base_url}/models"
response = requests.get(
url,
headers=self._headers(),
timeout=10
)
response.raise_for_status()
data = response.json()
return [model["id"] for model in data["data"]]
# 使用示例
client = OpenAIClient()
# 发送消息
messages = [
{"role": "system", "content": "你是一个有用的助手。"},
{"role": "user", "content": "解释什么是 API"}
]
result = client.chat_completion(messages)
print(result["choices"][0]["message"]["content"])
# 获取模型列表
models = client.get_models()
print(f"可用模型: {len(models)} 个")异步 HTTP 请求(httpx)
🎯 小白理解:为什么需要"异步"请求?
同步请求(一个一个来):
请求1 ──等待──→ 响应1 请求2 ──等待──→ 响应2 请求3 ──等待──→ 响应3 总时间: 3秒 + 3秒 + 3秒 = 9秒异步请求(同时发出):
请求1 ──等待──→ 响应1 请求2 ──等待──→ 响应2 (同时进行!) 请求3 ──等待──→ 响应3 总时间: 约 3秒类比:同步 = 一个服务员,一次只能服务一桌;异步 = 一个服务员,可以同时记录多桌的订单
什么时候用异步?
- 需要同时调用多个 API
- 需要同时处理多个用户的请求
- 需要提高程序响应速度
安装
bash
pip install httpx异步 API 客户端
python
import asyncio
import httpx
from typing import List, Dict, Any
import os
class AsyncOpenAIClient:
"""异步 OpenAI API 客户端"""
def __init__(self, api_key: str = None):
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
self.base_url = "https://api.openai.com/v1"
self.client = httpx.AsyncClient(
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
timeout=30.0
)
async def chat_completion(
self,
messages: List[Dict[str, str]],
model: str = "gpt-4",
temperature: float = 0.7
) -> Dict[str, Any]:
"""异步调用 Chat API"""
url = f"{self.base_url}/chat/completions"
payload = {
"model": model,
"messages": messages,
"temperature": temperature
}
response = await self.client.post(url, json=payload)
response.raise_for_status()
return response.json()
async def batch_completion(
self,
queries: List[str],
system_prompt: str = "你是一个有用的助手。"
) -> List[str]:
"""批量处理查询"""
tasks = []
for query in queries:
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query}
]
tasks.append(self.chat_completion(messages))
results = await asyncio.gather(*tasks)
return [
result["choices"][0]["message"]["content"]
for result in results
]
async def close(self):
"""关闭客户端"""
await self.client.aclose()
# 使用示例
async def demo_async_client():
client = AsyncOpenAIClient()
try:
# 并发处理多个查询
queries = [
"什么是 Python?",
"什么是异步编程?",
"什么是 LangChain?"
]
import time
start = time.time()
results = await client.batch_completion(queries)
end = time.time()
print(f"处理 {len(queries)} 个查询,耗时: {end - start:.2f}秒\n")
for query, result in zip(queries, results):
print(f"Q: {query}")
print(f"A: {result[:100]}...\n")
finally:
await client.close()
# 运行
# asyncio.run(demo_async_client())错误处理与重试
🎯 小白理解:为什么 API 调用需要"重试"?
网络请求可能失败,原因有很多:
错误类型 含义 应对策略 401身份验证失败 检查 API Key 429请求太频繁 等一会儿再试 500服务器内部错误 过会儿重试 Timeout请求超时 重试或增加超时时间 指数退避策略:
第1次失败 → 等 1 秒 → 重试 第2次失败 → 等 2 秒 → 重试 第3次失败 → 等 4 秒 → 重试 第4次失败 → 放弃,报错等待时间翻倍,避免一直"骚扰"已经很忙的服务器。
python
import requests
import time
from typing import Dict, Any, Optional, Callable
from functools import wraps
class APIError(Exception):
"""API 错误基类"""
pass
class RateLimitError(APIError):
"""速率限制错误"""
pass
class AuthenticationError(APIError):
"""认证错误"""
pass
def retry_on_error(
max_retries: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0,
exceptions: tuple = (requests.RequestException,)
):
"""重试装饰器"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
delay = initial_delay
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries - 1:
print(f"重试 {attempt + 1}/{max_retries}: {e}")
time.sleep(delay)
delay *= backoff_factor
raise last_exception
return wrapper
return decorator
class RobustAPIClient:
"""健壮的 API 客户端"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.openai.com/v1"
def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
"""处理响应和错误"""
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
raise AuthenticationError("API Key 无效")
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
raise RateLimitError(f"速率限制,请等待 {retry_after} 秒")
elif response.status_code >= 500:
raise APIError(f"服务器错误: {response.status_code}")
else:
raise APIError(f"请求失败: {response.status_code} - {response.text}")
@retry_on_error(max_retries=3, exceptions=(requests.RequestException, RateLimitError))
def chat_completion(
self,
messages: List[Dict[str, str]],
model: str = "gpt-4"
) -> Dict[str, Any]:
"""调用 Chat API(带重试)"""
url = f"{self.base_url}/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": messages
}
try:
response = requests.post(
url,
headers=headers,
json=payload,
timeout=30
)
return self._handle_response(response)
except requests.Timeout:
raise APIError("请求超时")
except requests.ConnectionError:
raise APIError("连接失败")
# 使用示例
try:
client = RobustAPIClient(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{"role": "user", "content": "Hello"}
]
result = client.chat_completion(messages)
print(result)
except AuthenticationError as e:
print(f"认证错误: {e}")
except RateLimitError as e:
print(f"速率限制: {e}")
except APIError as e:
print(f"API 错误: {e}")流式响应处理
🎯 小白理解:什么是"流式响应"?
普通响应:等 AI 写完整篇文章,一次性返回
用户提问 ──────────────────────→ [等待 10 秒] ──→ 一次性显示全部回答流式响应:AI 边写边返回,像打字一样逐字显示
用户提问 → "你" → "好" → "," → "我" → "是" → "AI" → ...流式响应的好处:
- 体验更好:用户不用干等,能看到 AI 在"思考"
- 感觉更快:第一个字很快就出现了
- ChatGPT 就是这样做的!
技术原理:使用 SSE(Server-Sent Events),服务器持续发送小块数据。
python
import requests
import json
from typing import Iterator, Dict, Any
class StreamingAPIClient:
"""流式 API 客户端"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.openai.com/v1"
def chat_completion_stream(
self,
messages: List[Dict[str, str]],
model: str = "gpt-4"
) -> Iterator[str]:
"""流式调用 Chat API"""
url = f"{self.base_url}/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": messages,
"stream": True # 启用流式响应
}
with requests.post(
url,
headers=headers,
json=payload,
stream=True,
timeout=30
) as response:
response.raise_for_status()
for line in response.iter_lines():
if not line:
continue
# 解码并解析 SSE 格式
line = line.decode('utf-8')
if line.startswith("data: "):
data = line[6:] # 去掉 "data: " 前缀
if data == "[DONE]":
break
try:
chunk = json.loads(data)
delta = chunk["choices"][0]["delta"]
if "content" in delta:
yield delta["content"]
except json.JSONDecodeError:
continue
# 使用示例
client = StreamingAPIClient(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{"role": "user", "content": "写一首关于 Python 的诗"}
]
print("Agent: ", end="", flush=True)
for chunk in client.chat_completion_stream(messages):
print(chunk, end="", flush=True)
print()API 速率限制管理
python
import time
from collections import deque
from typing import Callable, Any
from functools import wraps
class RateLimiter:
"""速率限制器"""
def __init__(self, max_calls: int, period: float):
"""
Args:
max_calls: 时间窗口内最大调用次数
period: 时间窗口(秒)
"""
self.max_calls = max_calls
self.period = period
self.calls = deque()
def __call__(self, func: Callable) -> Callable:
"""装饰器"""
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
now = time.time()
# 清理过期的调用记录
while self.calls and self.calls[0] < now - self.period:
self.calls.popleft()
# 检查是否超过限制
if len(self.calls) >= self.max_calls:
sleep_time = self.period - (now - self.calls[0])
if sleep_time > 0:
print(f"速率限制,等待 {sleep_time:.2f} 秒...")
time.sleep(sleep_time)
# 重新清理
now = time.time()
while self.calls and self.calls[0] < now - self.period:
self.calls.popleft()
# 记录调用
self.calls.append(now)
return func(*args, **kwargs)
return wrapper
# 使用示例
class APIClientWithRateLimit:
"""带速率限制的 API 客户端"""
def __init__(self, api_key: str):
self.api_key = api_key
# 限制: 每分钟最多 20 次调用
self.rate_limiter = RateLimiter(max_calls=20, period=60)
@property
def chat_completion(self):
"""返回带速率限制的方法"""
return self.rate_limiter(self._chat_completion)
def _chat_completion(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
"""实际的 API 调用"""
print(f"调用 API: {time.time():.2f}")
# 实际 API 调用逻辑
return {"status": "success"}
# 测试
client = APIClientWithRateLimit(api_key="test")
# 快速连续调用
for i in range(25):
result = client.chat_completion([{"role": "user", "content": f"消息 {i}"}])关键要点
- 错误处理:使用
try-except处理所有 HTTP 错误 - 重试机制:实现指数退避策略
- 超时设置:始终设置合理的超时时间
- 速率限制:遵守 API 提供商的速率限制
- 流式响应:对于长文本生成,使用流式 API