6.4 小结和复习
本章核心概念
🎯 小白总结:异步编程核心记忆卡
概念 用途 生活比喻 async def定义异步函数 创建一个"可暂停"的任务 await等待操作完成 等外卖时可以刷手机 gather()同时执行多个任务 同时点咖啡和蛋糕 Semaphore限制并发数 餐厅只有5个厨师 Queue任务排队 取号机 流式响应 边生成边返回 打字机效果
1. 异步编程基础
- async/await 语法:定义和调用异步函数
- asyncio.gather():并发执行多个协程
- asyncio.create_task():创建后台任务
- 并发 vs 并行:理解 I/O 密集型任务的优化
🎯 小白注意:并发 ≠ 并行
- 并发(Concurrent):交替执行多个任务(单核也能做到)
- 并行(Parallel):真正同时执行(需要多核)
异步是并发,适合 I/O 密集型任务(等网络、等文件)。 多进程是并行,适合 CPU 密集型任务(复杂计算)。
2. 异步工具
- 异步上下文管理器:
__aenter__和__aexit__ - 异步迭代器:
__aiter__和__anext__ - 异步工具类:实现
arun()方法 - 异步装饰器:包装异步函数
3. 实战技巧
- 流式响应:实时生成和展示结果
- 批处理系统:队列 + 工作协程 + Semaphore
- 重试机制:指数退避策略
- 性能监控:跟踪耗时和错误率
知识自测
🎯 小白提示:做题前的核心概念回顾
- 异步 = 不傻等:遇到等待就去做别的事
- I/O 密集型用异步:网络请求、文件读写、数据库查询
- CPU 密集型用多进程:复杂计算、图像处理
- 关键语法:
async def定义、await等待、gather()并发
选择题
下面哪个不是使用异步编程的主要原因?
- A. 提高 I/O 密集型任务的效率
- B. 实现 CPU 密集型任务的并行计算
- C. 减少等待时间
- D. 提高系统吞吐量
asyncio.gather()和asyncio.wait()的主要区别是?- A. gather 返回结果列表,wait 返回完成和待完成的集合
- B. gather 更快
- C. wait 更安全
- D. 没有区别
异步上下文管理器需要实现哪些方法?
- A.
__enter__和__exit__ - B.
__aenter__和__aexit__ - C.
__init__和__del__ - D.
__call__和__await__
- A.
查看答案
- B - 异步编程主要用于 I/O 密集型任务,CPU 密集型任务需要多进程
- A - gather 返回结果列表,wait 返回 (done, pending) 集合
- B - 异步上下文管理器使用
__aenter__和__aexit__
高难度编程挑战
🎯 小白须知:这些挑战是干什么的?
这两个挑战是进阶练习,用来检验你对异步编程的深入理解。
- 挑战 1(调度器):像医院的叫号系统——管理任务的优先级、等待、执行
- 挑战 2(分布式):像外卖平台——多个骑手(Agent)协作完成多个订单
如果感觉太难,可以先跳过,学完后面的章节再回来挑战!
挑战 1:智能任务调度器(难度:⭐⭐⭐⭐)
🎯 小白理解指南:什么是"任务调度器"?
想象你是医院的导诊台,需要安排病人看病:
调度器功能 医院比喻 优先级队列 急诊优先、老人优先 任务依赖 先抽血,再看化验结果 并发限制 只有 5 个诊室 超时控制 等太久就重新排队 重试机制 检查失败,重做一次 这个调度器就是用代码实现这套"医院管理系统"!
需求: 实现一个智能的异步任务调度器,支持以下功能:
- 优先级队列(高优先级任务优先执行)
- 任务依赖(任务 B 依赖任务 A 的结果)
- 并发限制(最多同时执行 N 个任务)
- 超时控制(任务超时自动取消)
- 重试机制(失败任务自动重试)
- 实时监控(显示任务状态和进度)
代码框架:
🎯 代码解读:关键概念速查
代码 作用 生活比喻 heapq优先级队列(自动排序) VIP 通道 TaskStatus任务状态枚举 订单状态:待处理/进行中/已完成 @dataclass自动生成 __init__等方法简化代码的语法糖 Semaphore限制并发数 只有 N 个窗口 dependencies任务依赖关系 先做 A 才能做 B
python
import asyncio
from typing import List, Dict, Any, Optional, Callable, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import heapq # 🎯 堆队列,自动按优先级排序
class TaskStatus(Enum):
"""任务状态——像订单的状态流转"""
PENDING = "pending" # 待处理
WAITING = "waiting" # 等待依赖(比如等化验结果)
RUNNING = "running" # 执行中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
TIMEOUT = "timeout" # 超时
CANCELLED = "cancelled" # 已取消
@dataclass # 🎯 自动生成 __init__、__repr__ 等方法
class Task:
"""任务数据类"""
id: str
func: Callable
args: tuple = field(default_factory=tuple)
kwargs: dict = field(default_factory=dict)
priority: int = 1 # 数字越小优先级越高
dependencies: Set[str] = field(default_factory=set)
timeout: float = 30.0
max_retries: int = 3
retry_delay: float = 1.0
# 运行时状态
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: Optional[Exception] = None
retries: int = 0
created_at: datetime = field(default_factory=datetime.now)
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
def __lt__(self, other):
"""支持优先级队列"""
return self.priority < other.priority
class SmartScheduler:
"""智能任务调度器"""
def __init__(self, max_workers: int = 5):
self.max_workers = max_workers
# TODO: 实现以下功能
# 1. 使用 heapq 实现优先级队列
# 2. 实现依赖关系管理
# 3. 使用 Semaphore 控制并发
# 4. 实现超时和重试
# 5. 实现实时监控
pass
async def add_task(self, task: Task) -> str:
"""添加任务"""
# TODO: 实现任务添加逻辑
pass
async def process_task(self, task: Task) -> Task:
"""处理单个任务"""
# TODO: 实现任务处理逻辑
# - 检查依赖是否完成
# - 执行任务(带超时)
# - 处理重试
pass
async def run(self) -> Dict[str, Task]:
"""运行调度器"""
# TODO: 实现主调度循环
pass
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
# TODO: 返回任务状态统计
pass
# 测试代码
async def test_scheduler():
scheduler = SmartScheduler(max_workers=3)
# 定义测试任务
async def task_a():
await asyncio.sleep(1)
return "A完成"
async def task_b(a_result: str):
await asyncio.sleep(1)
return f"B完成(依赖: {a_result})"
async def task_c():
await asyncio.sleep(0.5)
return "C完成"
async def task_fail():
raise Exception("模拟失败")
# 添加任务
task_a_id = await scheduler.add_task(Task(
id="task-a",
func=task_a,
priority=1
))
task_b_id = await scheduler.add_task(Task(
id="task-b",
func=task_b,
priority=2,
dependencies={"task-a"}
))
await scheduler.add_task(Task(
id="task-c",
func=task_c,
priority=1
))
await scheduler.add_task(Task(
id="task-fail",
func=task_fail,
max_retries=2
))
# 运行调度器
results = await scheduler.run()
# 打印结果
print("\n=== 任务结果 ===")
for task_id, task in results.items():
print(f"{task_id}: {task.status.value} - {task.result or task.error}")
print(f"\n=== 统计信息 ===")
print(scheduler.get_stats())
# asyncio.run(test_scheduler())评分标准:
- 优先级队列正确实现(20分)
- 依赖关系正确处理(25分)
- 并发控制和超时处理(20分)
- 重试机制(15分)
- 实时监控和统计(20分)
挑战 2:分布式 Agent 系统(难度:⭐⭐⭐⭐⭐)
🎯 小白理解指南:什么是"分布式 Agent 系统"?
想象你在运营一个外卖平台:
系统组件 外卖平台比喻 Agent 外卖骑手(各有专长:送餐、取餐) Coordinator 调度中心(分配订单) 能力注册 骑手登记自己会送什么区域 任务分发 根据骑手位置分配订单 消息传递 骑手和调度中心的对讲机 负载均衡 不让一个骑手太忙 容错机制 骑手请假,订单转给别人 为什么这么难? 因为需要同时考虑:
- 多个 Agent 并发运行
- Agent 之间的通信
- 任务的智能分配
- 故障的自动恢复
需求: 设计一个分布式 Agent 系统,支持多个 Agent 协作完成复杂任务。
核心功能:
- Agent 注册与发现:Agent 可以注册自己的能力
- 任务分发:根据 Agent 能力自动分发任务
- 消息传递:Agent 之间可以异步通信
- 负载均衡:自动分配任务到空闲 Agent
- 容错机制:Agent 失败时自动重新分配任务
- 协调器:中央协调器管理所有 Agent
代码框架:
python
import asyncio
from typing import Dict, List, Set, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
from abc import ABC, abstractmethod
import json
class AgentCapability(Enum):
"""Agent 能力"""
SEARCH = "search"
CALCULATE = "calculate"
SUMMARIZE = "summarize"
TRANSLATE = "translate"
class MessageType(Enum):
"""消息类型"""
TASK_REQUEST = "task_request"
TASK_RESULT = "task_result"
AGENT_STATUS = "agent_status"
HEARTBEAT = "heartbeat"
@dataclass
class Message:
"""消息数据类"""
type: MessageType
sender: str
receiver: str
content: Any
timestamp: float = field(default_factory=lambda: asyncio.get_event_loop().time())
@dataclass
class AgentInfo:
"""Agent 信息"""
id: str
capabilities: Set[AgentCapability]
status: str = "idle" # idle, busy, offline
tasks_completed: int = 0
last_heartbeat: float = field(default_factory=lambda: asyncio.get_event_loop().time())
class BaseAgent(ABC):
"""Agent 基类"""
def __init__(self, agent_id: str, capabilities: Set[AgentCapability]):
self.id = agent_id
self.capabilities = capabilities
self.message_queue: asyncio.Queue = asyncio.Queue()
self.coordinator: Optional['Coordinator'] = None
self.running = False
async def connect(self, coordinator: 'Coordinator'):
"""连接到协调器"""
self.coordinator = coordinator
await coordinator.register_agent(self)
@abstractmethod
async def process_task(self, task: Dict[str, Any]) -> Any:
"""处理任务(子类实现)"""
pass
async def run(self):
"""运行 Agent"""
# TODO: 实现 Agent 主循环
# - 接收消息
# - 处理任务
# - 发送心跳
pass
async def send_message(self, message: Message):
"""发送消息"""
# TODO: 实现消息发送
pass
class Coordinator:
"""协调器"""
def __init__(self):
self.agents: Dict[str, AgentInfo] = {}
self.message_bus: Dict[str, asyncio.Queue] = {}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.results: Dict[str, Any] = {}
async def register_agent(self, agent: BaseAgent):
"""注册 Agent"""
# TODO: 实现 Agent 注册逻辑
pass
async def assign_task(self, task: Dict[str, Any]) -> str:
"""分配任务"""
# TODO: 实现任务分配逻辑
# - 根据能力选择 Agent
# - 实现负载均衡
pass
async def monitor_agents(self):
"""监控 Agent 状态"""
# TODO: 实现监控逻辑
# - 检查心跳
# - 处理 Agent 失败
pass
async def run(self):
"""运行协调器"""
# TODO: 实现协调器主循环
pass
# 具体 Agent 实现
class SearchAgent(BaseAgent):
"""搜索 Agent"""
def __init__(self, agent_id: str):
super().__init__(agent_id, {AgentCapability.SEARCH})
async def process_task(self, task: Dict[str, Any]) -> Any:
query = task.get("query")
await asyncio.sleep(1) # 模拟搜索
return {"results": f"搜索结果: {query}"}
class CalculatorAgent(BaseAgent):
"""计算 Agent"""
def __init__(self, agent_id: str):
super().__init__(agent_id, {AgentCapability.CALCULATE})
async def process_task(self, task: Dict[str, Any]) -> Any:
expression = task.get("expression")
await asyncio.sleep(0.5)
return {"result": eval(expression)}
# 测试代码
async def test_distributed_system():
# 创建协调器
coordinator = Coordinator()
# 创建 Agent
agents = [
SearchAgent("search-1"),
SearchAgent("search-2"),
CalculatorAgent("calc-1"),
]
# 连接 Agent
for agent in agents:
await agent.connect(coordinator)
# 提交任务
tasks = [
{"type": AgentCapability.SEARCH, "query": "Python async"},
{"type": AgentCapability.CALCULATE, "expression": "10 + 20"},
{"type": AgentCapability.SEARCH, "query": "LangChain"},
]
# TODO: 运行系统并获取结果
pass
# asyncio.run(test_distributed_system())评分标准:
- Agent 注册与能力管理(20分)
- 任务分发和负载均衡(25分)
- 消息传递机制(20分)
- 容错和故障恢复(20分)
- 系统监控和统计(15分)
参考解决方案
挑战 1 参考实现(点击展开)
python
import asyncio
from typing import List, Dict, Any, Optional, Callable, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import heapq
class TaskStatus(Enum):
PENDING = "pending"
WAITING = "waiting"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
TIMEOUT = "timeout"
CANCELLED = "cancelled"
@dataclass
class Task:
id: str
func: Callable
args: tuple = field(default_factory=tuple)
kwargs: dict = field(default_factory=dict)
priority: int = 1
dependencies: Set[str] = field(default_factory=set)
timeout: float = 30.0
max_retries: int = 3
retry_delay: float = 1.0
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: Optional[Exception] = None
retries: int = 0
created_at: datetime = field(default_factory=datetime.now)
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
def __lt__(self, other):
return self.priority < other.priority
class SmartScheduler:
def __init__(self, max_workers: int = 5):
self.max_workers = max_workers
self.semaphore = asyncio.Semaphore(max_workers)
self.tasks: Dict[str, Task] = {}
self.pending_queue: List[Task] = []
self.completed_dependencies: Dict[str, Any] = {}
self.running_tasks: Set[str] = set()
async def add_task(self, task: Task) -> str:
self.tasks[task.id] = task
heapq.heappush(self.pending_queue, task)
return task.id
def _can_run(self, task: Task) -> bool:
"""检查任务是否可以运行"""
return all(dep in self.completed_dependencies for dep in task.dependencies)
async def _execute_with_timeout(self, task: Task) -> Any:
"""执行任务(带超时)"""
# 注入依赖结果
if task.dependencies:
dep_results = {dep: self.completed_dependencies[dep]
for dep in task.dependencies}
task.kwargs['dependencies'] = dep_results
return await asyncio.wait_for(
task.func(*task.args, **task.kwargs),
timeout=task.timeout
)
async def process_task(self, task: Task) -> Task:
async with self.semaphore:
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
self.running_tasks.add(task.id)
while task.retries <= task.max_retries:
try:
task.result = await self._execute_with_timeout(task)
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
self.completed_dependencies[task.id] = task.result
break
except asyncio.TimeoutError:
task.status = TaskStatus.TIMEOUT
task.error = Exception("任务超时")
break
except Exception as e:
task.error = e
task.retries += 1
if task.retries <= task.max_retries:
await asyncio.sleep(task.retry_delay * task.retries)
else:
task.status = TaskStatus.FAILED
break
self.running_tasks.remove(task.id)
return task
async def run(self) -> Dict[str, Task]:
workers = []
while self.pending_queue or workers:
# 启动可运行的任务
ready_tasks = []
remaining = []
while self.pending_queue:
task = heapq.heappop(self.pending_queue)
if self._can_run(task):
ready_tasks.append(task)
else:
task.status = TaskStatus.WAITING
remaining.append(task)
# 重新加入队列
for task in remaining:
heapq.heappush(self.pending_queue, task)
# 启动新任务
for task in ready_tasks:
workers.append(asyncio.create_task(self.process_task(task)))
# 等待任务完成
if workers:
done, workers = await asyncio.wait(
workers,
return_when=asyncio.FIRST_COMPLETED
)
await asyncio.sleep(0.1) # 短暂休眠,避免busy loop
return self.tasks
def get_stats(self) -> Dict[str, Any]:
status_counts = {}
for status in TaskStatus:
status_counts[status.value] = sum(
1 for t in self.tasks.values() if t.status == status
)
return {
"total": len(self.tasks),
"status": status_counts,
"completed": sum(1 for t in self.tasks.values()
if t.status == TaskStatus.COMPLETED),
"failed": sum(1 for t in self.tasks.values()
if t.status in [TaskStatus.FAILED, TaskStatus.TIMEOUT])
}学习资源
🎯 小白学习建议:异步编程的学习路线
- 先理解概念:搞清楚"为什么要异步"比"怎么写"更重要
- 多写多练:光看不练永远学不会,试着改改示例代码
- 从简单开始:先用
gather()并发几个任务,再学复杂的- 调试技巧:多用
print()看执行顺序,理解并发行为
推荐阅读
进阶主题
asyncio内部实现原理uvloop高性能事件循环- 异步数据库操作(asyncpg, motor)
- WebSocket 和 Server-Sent Events