深入理解 Python 异步编程(下)
# 一、GIL 对异步编程的影响
# 1.1 GIL 是什么
全局解释器锁(Global Interpreter Lock,GIL)是 CPython 解释器的一个机制,它确保同一时间只有一个线程可以执行 Python 字节码。这意味着在多核 CPU 环境下,多线程程序无法真正利用多核优势执行计算密集型任务。
import threading
import time
def cpu_intensive_task():
count = 0
for _ in range(10**8):
count += 1
return count
# 单线程执行
start = time.time()
cpu_intensive_task()
cpu_intensive_task()
print(f"单线程耗时: {time.time() - start:.2f}秒") # 约10秒
# 多线程执行
start = time.time()
t1 = threading.Thread(target=cpu_intensive_task)
t2 = threading.Thread(target=cpu_intensive_task)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"多线程耗时: {time.time() - start:.2f}秒") # 仍约10秒,GIL导致无法并行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 1.2 异步编程如何规避 GIL 限制
异步编程通过事件循环和协程在单线程中调度任务,避免了 GIL 的影响:
- I/O 密集型任务:协程在 I/O 阻塞时释放 CPU,让其他协程执行
- CPU 密集型任务:需配合多进程或线程池执行
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
# CPU密集型任务放入进程池
async def cpu_task():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
ProcessPoolExecutor(),
cpu_intensive_task
)
return result
# 异步执行两个CPU任务
start = time.time()
async def main():
task1 = cpu_task()
task2 = cpu_task()
await asyncio.gather(task1, task2)
asyncio.run(main())
print(f"异步+进程池耗时: {time.time() - start:.2f}秒") # 约5秒,利用多核
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 二、asyncio 踩坑经验
# 2.1 回调地狱与解决方案
问题:多层回调嵌套导致代码可读性差
# 反例:回调嵌套
def callback1():
print("回调1")
def callback2():
print("回调2")
def callback3():
print("回调3")
asyncio.get_event_loop().call_soon(callback3)
asyncio.get_event_loop().call_soon(callback2)
asyncio.get_event_loop().call_soon(callback1)
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
解决方案:使用 async/await 重构
# 正例:async/await风格
async def chain_tasks():
await asyncio.sleep(0.1)
print("任务1")
await asyncio.sleep(0.1)
print("任务2")
await asyncio.sleep(0.1)
print("任务3")
asyncio.run(chain_tasks())
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 2.2 异常处理陷阱
问题:未捕获的异常会导致事件循环崩溃
# 反例:未处理异常
async def faulty_task():
raise ValueError("任务出错")
async def main():
task = asyncio.create_task(faulty_task())
await asyncio.sleep(0.1) # 未等待任务,异常未捕获
# 运行会抛出未处理的异常
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
解决方案:使用 try/except 或 Task 异常处理
# 正例:异常处理
async def safe_main():
task = asyncio.create_task(faulty_task())
try:
await task
except ValueError as e:
print(f"捕获异常: {e}")
asyncio.run(safe_main())
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 2.3 任务取消不当
问题:取消任务时未正确处理资源释放
# 反例:未处理取消
async def task_with_resource():
print("获取资源")
try:
await asyncio.sleep(10)
finally:
print("资源未释放") # 任务取消时不会执行
async def main():
task = asyncio.create_task(task_with_resource())
await asyncio.sleep(0.1)
task.cancel()
await task # 会抛出CancelledError
# 运行会输出"获取资源",但资源未释放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
解决方案:使用 try/except 捕获取消异常
# 正例:正确处理取消
async def task_with_resource():
print("获取资源")
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("任务取消,释放资源")
raise # 重新抛出取消异常
asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 三、编程模型对比分析
# 3.1 回调、协程、绿程、线程对比
模型 | 调度方式 | 并发支持 | 优点 | 缺点 |
---|---|---|---|---|
回调 | 事件驱动 | 单线程 | 轻量级,底层实现简单 | 代码可读性差,回调地狱 |
协程 | 协作式多任务 | 单线程 | 代码接近同步,性能高效 | 需要语言层面支持 |
绿程 | 用户态线程 | 单线程 | 轻量级,切换成本低 | 需框架支持,调度可控性差 |
线程 | 操作系统调度 | 多线程 | 原生支持,适合 I/O 和 CPU 任务 | 上下文切换开销大,GIL 限制 |
# 3.2 多进程、多线程、协程适用场景
模型 | 适用场景 | 示例场景 |
---|---|---|
多进程 | CPU 密集型任务,需利用多核 | 科学计算、图像处理 |
多线程 | I/O 密集型任务,需阻塞操作 | 网络请求、文件读写 |
协程 | 高并发 I/O 密集型任务,需单线程处理 | 爬虫、Web 服务器、消息队列 |
# 四、框架对比与技术选型
# 4.1 Gevent/libev、uvloop/libuv 与 asyncio
框架 | 底层实现 | 特点 | 适用场景 |
---|---|---|---|
asyncio | Python 原生 | 标准库,功能全面,跨平台 | 通用异步编程 |
Gevent | libev | 猴子补丁,兼容同步代码 | 快速改造现有同步代码 |
uvloop | libuv | 性能优化,比 asyncio 快数倍 | 高性能网络服务 |
# 4.2 选型建议
- 初学者:从 asyncio 开始,利用标准库学习异步编程
- 性能优先:uvloop+asyncio 组合,提升网络 IO 性能
- 代码兼容性:Gevent 适合改造现有同步代码为异步
- 跨平台需求:asyncio 原生支持 Windows 和 Unix
# 五、Python 异步编程指导细则
# 5.1 代码结构最佳实践
- 单一职责:每个协程只做一件事
- 异步上下文管理器:使用
async with
管理资源 - 避免阻塞:所有 I/O 操作必须异步化
# 推荐写法
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
1
2
3
4
5
2
3
4
5
# 5.2 性能优化策略
- 连接池复用:重用 HTTP 会话、数据库连接
- 批量操作:合并小 I/O 操作减少上下文切换
- 合理设置超时:使用
asyncio.wait_for
避免长时间阻塞
# 连接池示例
async def create_pool():
pool = aiohttp.TCPConnector(limit=100)
session = aiohttp.ClientSession(connector=pool)
return session
1
2
3
4
5
2
3
4
5
# 5.3 错误处理规范
- 全局异常捕获:使用
loop.set_exception_handler
- 任务级异常处理:每个任务包含 try/except
- 超时处理:所有异步操作设置合理超时
# 全局异常处理
def handle_exception(loop, context):
print(f"捕获全局异常: {context['exception']}")
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
1
2
3
4
5
6
2
3
4
5
6
# 六、综合案例:异步爬虫系统
# 6.1 需求背景
开发一个高性能异步爬虫,需满足:
- 同时爬取 1000+网页
- 支持失败重试
- 控制并发量避免封禁
- 处理不同域名的请求
# 6.2 实现方案
import asyncio
import aiohttp
import random
from urllib.parse import urlparse
class AsyncCrawler:
def __init__(self, concurrency=100, retries=3, timeout=10):
self.concurrency = concurrency
self.retries = retries
self.timeout = timeout
self.semaphore = asyncio.Semaphore(concurrency)
self.session = None
self.domain_limits = {} # 域名级并发控制
async def init_session(self):
connector = aiohttp.TCPConnector(
limit=self.concurrency,
limit_per_host=10 # 每个域名最多10个连接
)
self.session = aiohttp.ClientSession(connector=connector)
async def fetch(self, url):
domain = urlparse(url).netloc
# 域名级并发控制
if domain not in self.domain_limits:
self.domain_limits[domain] = asyncio.Semaphore(10)
sem = self.domain_limits[domain]
async with self.semaphore, sem:
for attempt in range(self.retries):
try:
async with asyncio.timeout(self.timeout):
async with self.session.get(url) as response:
if response.status == 200:
return await response.text()
print(f"请求失败: {url}, 状态码: {response.status}")
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
print(f"尝试{attempt+1}/{self.retries}失败: {url}, 错误: {e}")
await asyncio.sleep(random.uniform(0.5, 2.0)) # 退避重试
return None
async def crawl_many(self, urls):
await self.init_session()
tasks = [self.fetch(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
await self.session.close()
return results
# 使用示例
async def main():
urls = [f"https://example.com/page/{i}" for i in range(100)]
crawler = AsyncCrawler(concurrency=200)
results = await crawler.crawl_many(urls)
print(f"成功爬取{sum(1 for r in results if r is not None)}页")
asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 6.3 关键技术点
双层并发控制:
- 全局并发限制
self.semaphore
- 域名级并发限制
domain_limits
- 全局并发限制
智能重试机制:
- 固定重试次数
retries
- 随机退避策略
random.uniform
- 固定重试次数
资源管理:
- 异步上下文管理器管理会话和连接
- 超时控制避免长时间阻塞
# 七、总结与进阶方向
# 7.1 核心知识回顾
- 异步编程优势:高并发 I/O 处理,单线程高效调度
- GIL 影响:异步编程通过单线程规避 GIL,但 CPU 任务需结合多进程
- 模型选择:根据任务类型选择协程、线程或进程
- 框架选型:asyncio 作为基础,uvloop 提升性能
# 7.2 进阶学习方向
- 网络编程:深入学习 TCP/UDP 异步编程
- 分布式系统:异步框架与分布式任务调度结合
- 性能优化:使用
cProfile
分析异步程序性能 - 实战项目:开发异步 Web 框架、消息队列或爬虫系统
通过掌握上述内容,您将能够在 Python 中熟练运用异步编程解决高并发问题,构建高效、可扩展的异步应用系统。
编辑 (opens new window)
上次更新: 2025-06-25, 07:15:38