asyncio 协程面试题
# 1. 什么是异步编程?asyncio 与多线程、多进程的区别是什么
回答:
异步编程是一种非阻塞的编程模式,通过事件循环调度任务,允许程序在等待 IO 操作(如网络请求、文件读写)时继续执行其他任务,避免线程阻塞。
asyncio 与多线程的区别:
- 线程模型:多线程通过操作系统内核调度,每个线程有独立栈空间,适合 IO 密集型和 CPU 密集型任务,但存在线程切换开销和锁竞争问题。
- asyncio 模型:单线程事件循环,通过协程(coroutine)实现任务切换,切换开销极低,适合高并发 IO 场景,但 CPU 密集型任务会阻塞事件循环。
asyncio 与多进程的区别:
- 多进程利用多核 CPU,适合 CPU 密集型任务,但进程间通信复杂,内存占用高。
- asyncio 单线程模型无法利用多核,但可通过
loop.run_in_executor()
结合线程池/进程池处理 CPU 任务。
# 协程调度原理
- 非抢占式执行:协程的执行权转移,要靠显式的
await
语句。只有当协程主动让出控制权时,事件循环才会去调度其他协程。 - 事件循环驱动:事件循环就像协程调度的 “指挥官”,它负责管理协程的状态,并且在协程可以继续执行(比如 I/O 操作完成)时恢复其执行。
- 异步 I/O 绑定:协程主要应用于 I/O 密集型任务。在进行 I/O 操作时,协程会暂停执行,此时事件循环就能去处理其他任务了。
# GIL 对协程的影响
- 不影响 I/O 密集型任务:因为协程在等待 I/O 操作时,会主动释放控制权,所以 GIL 不会对其产生影响。多个协程可以在同一个线程中高效地处理 I/O 操作。
- CPU 密集型任务受限:如果协程中包含 CPU 密集型代码,那么 GIL 会导致同一时刻只有一个线程能执行 Python 字节码。不过,这种情况下可以通过
asyncio.to_thread()
将 CPU 密集型任务放到线程池中执行。
import asyncio
async def task1():
print("Task 1: Start")
await asyncio.sleep(1) # 主动让出控制权
print("Task 1: End")
async def task2():
print("Task 2: Start")
await asyncio.sleep(0.5) # 主动让出控制权
print("Task 2: End")
async def main():
await asyncio.gather(task1(), task2())
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2. 解释 async/await 的工作原理,与生成器有什么关系
回答:
async def
定义的是原生协程(coroutine),await
用于挂起协程并等待另一个协程的结果,本质是通过事件循环调度协程的执行。- 在 Python 3.4 中,asyncio 最初基于生成器(generator)实现协程(如
@asyncio.coroutine
和yield from
),而 Python 3.5+引入的async/await
是原生协程语法,更简洁且性能更好。 - 原生协程与生成器的区别:
- 生成器通过
yield
产出值,通过send()
传入值; - 原生协程通过
await
等待结果,只能在async def
函数中使用,且不兼容生成器的send()
方法。
- 生成器通过
# 3. 事件循环(Event Loop)的作用是什么?如何在 asyncio 中获取事件循环
回答:
事件循环是 asyncio 的核心,负责调度协程的执行,监听 IO 事件并在事件就绪时恢复协程。
获取事件循环的方式:
# Python 3.7+推荐方式 import asyncio # 获取当前线程的事件循环 loop = asyncio.get_event_loop() # 创建新的事件循环(不推荐在生产环境直接使用) loop = asyncio.new_event_loop() # 运行事件循环直到协程完成 result = loop.run_until_complete(coroutine()) # 关闭事件循环(程序结束前调用) loop.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14注意: 在 Windows 系统中,默认事件循环为
ProactorEventLoop
,而 Unix 系统为SelectorEventLoop
,某些 API 可能存在兼容性差异。
# 4. Task 和 Future 的区别是什么?如何创建 Task
回答:
Future:表示一个异步操作的最终结果,是一个可等待对象(awaitable),通常由底层操作自动创建。
Task:是 Future 的子类,用于包装协程并将其加入事件循环调度,可理解为“正在运行的协程”。
创建 Task 的方式:
import asyncio async def my_coroutine(): await asyncio.sleep(1) return "Done" # 方式1:使用loop.create_task() loop = asyncio.get_event_loop() task = loop.create_task(my_coroutine()) # 方式2:使用asyncio.create_task()(Python 3.7+) task = asyncio.create_task(my_coroutine()) # 等待Task完成 loop.run_until_complete(task) print(task.result()) # 输出 "Done"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 5. 如何处理异步任务中的异常?未处理的异常会导致什么问题
回答:
异常处理方式:
使用
try/except
捕获协程内的异常:async def task_with_exception(): try: await asyncio.sleep(1) raise ValueError("异常示例") except Exception as e: print(f"捕获异常: {e}")
1
2
3
4
5
6对 Task 使用
add_done_callback()
处理异常:def handle_exception(task): if task.exception(): print(f"Task异常: {task.exception()}") task = asyncio.create_task(task_with_exception()) task.add_done_callback(handle_exception)
1
2
3
4
5
6使用
asyncio.gather()
的return_exceptions
参数:async def main(): tasks = [task_with_exception(), another_task()] results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, Exception): print(f"处理结果中的异常: {result}")
1
2
3
4
5
6
未处理异常的影响: 若协程抛出异常且未被捕获,事件循环会抛出
RuntimeError
并可能导致程序崩溃,因此必须确保异常被正确处理。
# 6. asyncio 中如何实现并发控制?比如限制同时运行的任务数
回答:
使用信号量(Semaphore):
信号量可控制并发任务的数量,适合限制对外部资源(如 API 接口、数据库连接)的访问频率。import asyncio async def task_with_semaphore(semaphore, id): async with semaphore: print(f"任务 {id} 开始执行") await asyncio.sleep(1) print(f"任务 {id} 执行完成") async def main(): # 限制最多3个任务并发 semaphore = asyncio.Semaphore(3) tasks = [task_with_semaphore(semaphore, i) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15其他方式:
- 使用
asyncio.Semaphore
配合asyncio.wait()
手动管理任务队列; - 对耗时任务使用
loop.run_in_executor()
提交到线程池/进程池,避免阻塞事件循环。
- 使用
# 7. 异步上下文管理器(Async Context Manager)和异步迭代器(Async Iterator)的作用是什么
回答:
异步上下文管理器:
通过async with
语句使用,用于管理异步资源的生命周期(如连接池、文件句柄),需实现__aenter__()
和__aexit__()
异步方法。import asyncio class AsyncDatabase: async def __aenter__(self): print("连接数据库") return self async def __aexit__(self, exc_type, exc_val, exc_tb): print("关闭数据库连接") async def query(self): await asyncio.sleep(0.1) return "查询结果" async def main(): async with AsyncDatabase() as db: result = await db.query() print(result) asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20异步迭代器:
通过async for
语句迭代异步生成器,需实现__aiter__()
和__anext__()
方法,适用于流式处理异步数据(如网络数据流)。async def async_generator(): for i in range(3): await asyncio.sleep(0.1) yield i async def main(): async for item in async_generator(): print(item) asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
# 8. asyncio.sleep()是如何实现非阻塞的?与 time.sleep()的区别是什么
回答:
asyncio.sleep(delay)
是一个协程,通过事件循环注册一个定时器事件,在指定延迟后恢复协程执行,不会阻塞事件循环。time.sleep(delay)
是阻塞函数,会暂停当前线程,导致事件循环无法调度其他协程,在 asyncio 中使用会造成整个程序阻塞。示例对比:
import asyncio import time async def async_sleep(): print(f"开始异步睡眠: {time.strftime('%X')}") await asyncio.sleep(1) # 非阻塞 print(f"结束异步睡眠: {time.strftime('%X')}") def sync_sleep(): print(f"开始同步睡眠: {time.strftime('%X')}") time.sleep(1) # 阻塞线程 print(f"结束同步睡眠: {time.strftime('%X')}") async def main(): # 异步睡眠不阻塞其他任务 await asyncio.gather(async_sleep(), async_sleep()) # 同步睡眠会阻塞事件循环 loop = asyncio.get_event_loop() await loop.run_in_executor(None, sync_sleep) # 需放入线程池执行 asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 9. 异步编程的优势和适用场景是什么?有哪些局限性
回答:
优势:
- 高并发处理 IO 密集型任务,如网络请求、数据库操作、文件读写;
- 单线程模型减少线程切换开销,内存占用低;
- 代码结构更简洁,避免回调地狱(Callback Hell)。
适用场景:
- 网络服务(Web 服务器、API 客户端);
- 实时数据处理(日志收集、消息队列);
- 分布式系统中的异步通信。
局限性:
- 不适合 CPU 密集型任务(需配合线程池/进程池);
- 调试难度较高,异常堆栈可能不完整;
- 与同步代码集成时需小心处理线程安全问题。
# 10. 请对比 asyncio.gather()和 asyncio.wait()的用法和区别
回答:
特性 | asyncio.gather() | asyncio.wait() |
---|---|---|
任务组织方式 | 接受多个协程/任务作为参数,统一管理 | 接受任务列表,通过return_when 控制返回时机 |
结果处理 | 按参数顺序返回结果,异常会直接抛出 | 返回完成的任务集合,需手动处理结果顺序 |
异常处理 | 可通过return_exceptions=True 捕获异常 | 需手动检查每个任务的异常状态 |
取消任务 | 调用gather.cancel() 取消所有任务 | 需手动取消未完成的任务 |
适用场景 | 简单场景下并行执行任务并获取有序结果 | 复杂场景下需要精细控制任务状态和顺序 |
示例对比:
import asyncio
async def task(i):
await asyncio.sleep(0.1)
return i
# 使用gather
async def use_gather():
tasks = [task(i) for i in range(3)]
results = await asyncio.gather(*tasks)
print("Gather结果:", results) # 输出 [0, 1, 2]
# 使用wait
async def use_wait():
tasks = [task(i) for i in range(3)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
results = [task.result() for task in done]
print("Wait结果:", results) # 输出 [0, 1, 2](顺序可能不同)
asyncio.run(asyncio.gather(use_gather(), use_wait()))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 11. 如何在 asyncio 中处理超时?请举例说明
回答:
使用asyncio.wait_for()
函数为协程设置超时时间,超时会抛出asyncio.TimeoutError
。
import asyncio
async def slow_operation():
await asyncio.sleep(2) # 模拟耗时操作
return "完成"
async def main():
try:
# 设置1秒超时
result = await asyncio.wait_for(slow_operation(), timeout=1)
print(result)
except asyncio.TimeoutError:
print("操作超时")
asyncio.run(main()) # 输出 "操作超时"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 12. 简述 asyncio 的底层实现原理(事件循环、IO 多路复用等)
回答:
- 事件循环核心机制:
事件循环基于 IO 多路复用(如 Unix 的select/poll/epoll
,Windows 的IOCP
),通过注册文件描述符(socket、pipe 等)的读写事件,当事件就绪时触发回调函数或恢复协程。 - 协程调度流程:
- 协程通过
await
挂起,将控制权交还给事件循环; - 事件循环继续处理其他就绪的 IO 事件或定时任务;
- 当
await
的目标完成时,事件循环将协程加入就绪队列,等待调度执行。
- 协程通过
- 与其他异步框架的对比:
- Node.js 使用单线程事件循环+回调函数,asyncio 使用原生协程+
await
语法更符合同步编程思维; - Tornado 基于回调函数,asyncio 的原生协程更易维护。
- Node.js 使用单线程事件循环+回调函数,asyncio 使用原生协程+
# 13. 如何在 asyncio 中集成同步代码?有哪些注意事项
回答:
通过loop.run_in_executor()
将同步函数提交到线程池或进程池执行,避免阻塞事件循环。
import asyncio
import time
def sync_function():
time.sleep(1) # 同步阻塞函数
return "同步结果"
async def main():
loop = asyncio.get_event_loop()
# 提交到默认线程池(ThreadPoolExecutor)
result1 = await loop.run_in_executor(None, sync_function)
# 提交到自定义进程池(适合CPU密集型任务)
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
result2 = await loop.run_in_executor(executor, sync_function)
print(result1, result2)
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
注意事项:
- 线程池适用于 IO 密集型同步任务,进程池适用于 CPU 密集型任务;
- 跨线程/进程传递数据需注意序列化问题(如使用 pickle);
- 避免频繁创建线程池/进程池,建议复用实例。
# 多个协程去操作同一数据,保持数据的原子性,是如何实现的?
在Python中,多个协程操作同一数据时,可以通过以下几种方式保证数据的原子性:
# 1. 使用异步锁(asyncio.Lock
)
通过锁机制确保同一时间只有一个协程可以访问共享资源。
示例代码:
import asyncio
shared_data = 0
lock = asyncio.Lock()
async def safe_increment():
global shared_data
async with lock: # 同一时间只允许一个协程执行此代码块
temp = shared_data
await asyncio.sleep(0.1) # 模拟耗时操作
shared_data = temp + 1
async def main():
tasks = [safe_increment() for _ in range(10)]
await asyncio.gather(*tasks)
print(f"Final value: {shared_data}") # 输出: 10
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 2. 原子操作(使用不可中断的操作)
如果操作本身是原子的(如单个变量赋值),则无需额外同步。
示例:
shared_counter = 0
async def atomic_update():
nonlocal shared_counter
shared_counter += 1 # Python的 += 操作在CPython中通常是原子的
2
3
4
5
注意:复杂操作(如a = a + 1
)可能涉及多个步骤,并非原子操作,仍需锁保护。
# 3. 使用队列(asyncio.Queue
)
通过队列将共享资源的操作串行化,避免竞争条件。
示例:
import asyncio
queue = asyncio.Queue()
shared_data = []
async def producer():
for i in range(5):
await queue.put(i)
await asyncio.sleep(0.1)
async def consumer():
while True:
item = await queue.get()
shared_data.append(item) # 所有修改通过队列串行化
queue.task_done()
async def main():
task1 = asyncio.create_task(producer())
task2 = asyncio.create_task(consumer())
await task1
await queue.join()
task2.cancel()
print(shared_data) # 输出: [0, 1, 2, 3, 4]
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 4. 不可变数据结构+原子替换
使用不可变数据结构(如tuple
、frozenset
),通过原子替换保证线程安全。
示例:
shared_state = ()
async def update_state(new_item):
nonlocal shared_state
# 创建新的不可变对象并原子替换
shared_state = (*shared_state, new_item)
2
3
4
5
6
# 5. 使用线程安全的数据结构
对于复杂场景,可以使用concurrent.futures
中的线程安全容器(需结合asyncio.to_thread
)。
示例:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
thread_safe_queue = Queue()
executor = ThreadPoolExecutor()
async def add_to_queue(item):
loop = asyncio.get_running_loop()
await loop.run_in_executor(executor, thread_safe_queue.put, item)
2
3
4
5
6
7
8
9
10
# 总结
方法 | 适用场景 | 示例代码 |
---|---|---|
异步锁 (asyncio.Lock ) | 保护临界区代码 | async with lock: ... |
原子操作 | 简单变量赋值 | x += 1 |
队列 (asyncio.Queue ) | 串行化操作 | await queue.put(data) |
不可变数据结构 | 频繁替换状态 | shared_state = new_state |
线程安全容器 | 复杂共享对象 | 使用queue.Queue + 线程池 |
最佳实践:优先使用asyncio.Lock
或asyncio.Queue
,它们专为协程设计,能避免死锁和竞态条件。
# 协程调度如何优雅退出?
在Python中,协程的优雅退出需要考虑任务取消、资源清理和异常处理。以下是几种常见的优雅退出方法:
# 1. 使用 asyncio.gather
的 return_exceptions=True
捕获所有任务的异常,避免一个任务失败导致整个程序崩溃。
import asyncio
async def task(delay):
try:
await asyncio.sleep(delay)
return f"Task completed after {delay}s"
except asyncio.CancelledError:
print(f"Task with delay {delay} was cancelled")
raise # 重新抛出异常以保持取消状态
async def main():
tasks = [task(1), task(10)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"Caught exception: {result}")
else:
print(result)
# 运行主函数
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 2. 注册信号处理器
监听系统信号(如 SIGINT
、SIGTERM
),主动触发任务取消。
import asyncio
import signal
async def long_running_task():
try:
while True:
print("Working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task is cleaning up...")
await asyncio.sleep(0.5) # 模拟清理操作
print("Task cleanup completed")
raise
async def main():
task = asyncio.create_task(long_running_task())
# 注册信号处理器
loop = asyncio.get_running_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop, task))
)
await task
async def shutdown(signal, loop, task):
print(f"Received exit signal {signal.name}")
task.cancel()
await task
loop.stop()
# 运行主函数
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 3. 使用 async with
管理资源
通过 async context manager
确保资源在任务取消时被正确释放。
import asyncio
class DatabaseConnection:
def __init__(self):
self.connected = False
async def connect(self):
print("Connecting to database...")
await asyncio.sleep(0.5)
self.connected = True
print("Connected")
async def disconnect(self):
print("Disconnecting from database...")
await asyncio.sleep(0.5)
self.connected = False
print("Disconnected")
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()
async def worker():
async with DatabaseConnection() as db:
try:
while True:
print("Processing data...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Worker is being cancelled")
raise
# 运行主函数
asyncio.run(worker())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 4. 逐级取消任务层次结构
对于复杂的任务树,需要从顶层到底层逐级取消,确保每个子任务都有机会清理资源。
import asyncio
async def child_task(name):
try:
print(f"{name} started")
await asyncio.sleep(10)
print(f"{name} completed")
except asyncio.CancelledError:
print(f"{name} is being cancelled")
raise
async def parent_task():
child1 = asyncio.create_task(child_task("Child 1"))
child2 = asyncio.create_task(child_task("Child 2"))
try:
await asyncio.gather(child1, child2)
except asyncio.CancelledError:
print("Parent task is being cancelled")
child1.cancel()
child2.cancel()
await asyncio.gather(child1, child2, return_exceptions=True)
raise
# 运行主函数
asyncio.run(parent_task())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 5. 设置超时退出
使用 asyncio.wait_for
或 asyncio.wait
设置任务最大执行时间。
import asyncio
async def slow_task():
await asyncio.sleep(10)
return "Done"
async def main():
try:
# 设置任务超时时间为3秒
result = await asyncio.wait_for(slow_task(), timeout=3)
print(result)
except asyncio.TimeoutError:
print("Task timed out")
# 运行主函数
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 总结
- 捕获异常:使用
return_exceptions=True
避免任务间的异常传播。 - 信号处理:监听系统信号,主动触发取消逻辑。
- 资源管理:通过
async with
确保资源自动释放。 - 层次取消:对于复杂任务,从顶层逐级取消子任务。
- 超时控制:使用
wait_for
设置任务最大执行时间。
优雅退出的核心是给每个协程处理取消请求的机会,避免资源泄漏和数据不一致。
# 协程在死锁状态下如何退出?
在Python协程中,死锁通常发生在多个协程互相等待对方释放资源(如锁、信号量),导致所有协程无法继续执行的情况。解决死锁需要打破其必要条件(互斥、请求与保持、不剥夺、循环等待),以下是几种针对性的退出方案:
# 一、死锁的典型场景与原因
import asyncio
async def deadlock_demo():
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def task1():
async with lock1:
await asyncio.sleep(0.1)
async with lock2: # 等待lock2,此时task2在等待lock1
print("Task1 done")
async def task2():
async with lock2:
await asyncio.sleep(0.1)
async with lock1: # 等待lock1,此时task1在等待lock2
print("Task2 done")
# 同时运行两个任务,导致死锁
await asyncio.gather(task1(), task2())
# 运行此代码会导致死锁,程序无响应
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
死锁原因:
- 任务1获取
lock1
后等待lock2
,任务2获取lock2
后等待lock1
,形成循环等待。
# 二、死锁退出方案
# 1. 为资源获取设置超时(破坏“不剥夺”条件)
通过asyncio.wait_for
为获取锁或执行任务设置超时,超时后主动抛出异常打破死锁。
import asyncio
async def deadlock_with_timeout():
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def task1():
try:
# 为获取lock2设置超时
async with asyncio.wait_for(lock2.acquire(), timeout=1):
print("Task1 acquired lock2")
except asyncio.TimeoutError:
print("Task1 timeout, releasing lock1")
lock1.release() # 主动释放已获取的锁
raise
async def task2():
try:
# 为获取lock1设置超时
async with asyncio.wait_for(lock1.acquire(), timeout=1):
print("Task2 acquired lock1")
except asyncio.TimeoutError:
print("Task2 timeout, releasing lock2")
lock2.release()
raise
# 先获取锁,模拟死锁前的状态
await lock1.acquire()
await lock2.acquire()
lock1.release() # 释放锁以触发死锁场景
lock2.release()
try:
await asyncio.gather(task1(), task2())
except asyncio.TimeoutError:
print("Deadlock detected and broken by timeout")
asyncio.run(deadlock_with_timeout())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 2. 周期性检测死锁状态(主动破坏循环等待)
通过记录锁的占用状态,定期检查是否存在循环等待链。
import asyncio
from collections import deque
class DeadlockDetector:
def __init__(self):
self.lock_owners = {} # 记录锁的当前持有者
self.waiting_tasks = {} # 记录等待锁的任务
def register_lock(self, lock, task):
self.lock_owners[lock] = task
def register_wait(self, lock, task):
self.waiting_tasks[task] = lock
def detect_cycle(self):
# 简化版:检查是否存在任务等待链
visited = set()
for task in self.waiting_tasks:
if task in visited:
continue
path = set()
current = task
while current in self.waiting_tasks:
if current in path:
# 发现循环等待
return True
path.add(current)
current = self.lock_owners.get(self.waiting_tasks[current])
visited.update(path)
return False
async def deadlock_detection():
detector = DeadlockDetector()
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def task1():
await lock1.acquire()
detector.register_lock(lock1, task1)
try:
# 模拟等待lock2
detector.register_wait(lock2, task1)
await asyncio.sleep(0.1) # 等待lock2被task2获取
finally:
lock1.release()
async def task2():
await lock2.acquire()
detector.register_lock(lock2, task2)
try:
# 模拟等待lock1
detector.register_wait(lock1, task2)
await asyncio.sleep(0.1) # 等待lock1被task1获取
finally:
lock2.release()
# 启动检测协程,定期检查死锁
async def detection_loop():
while True:
await asyncio.sleep(0.5)
if detector.detect_cycle():
print("Deadlock detected! Forcing exit...")
# 强制取消所有任务
for task in [task1, task2]:
task.cancel()
break
detect_task = asyncio.create_task(detection_loop())
tasks = [task1(), task2()]
try:
await asyncio.gather(*tasks, detect_task)
except asyncio.CancelledError:
print("Tasks cancelled due to deadlock")
asyncio.run(deadlock_detection())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# 3. 使用信号强制终止(终极方案)
当其他方案无效时,通过系统信号(如SIGINT
)强制终止程序,适用于长时间无响应的死锁。
import asyncio
import signal
import sys
async def deadlock_task():
lock = asyncio.Lock()
async with lock:
print("Task acquired lock")
await asyncio.sleep(10) # 模拟死锁等待
async with lock: # 再次获取同一锁,导致死锁
print("This line will never execute")
async def main():
task = asyncio.create_task(deadlock_task())
# 注册信号处理器
def signal_handler(sig, frame):
print(f"Received signal {sig}, forcing exit")
task.cancel()
sys.exit(0)
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, signal_handler)
try:
await task
except asyncio.CancelledError:
print("Task cancelled, exiting")
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
使用场景:当死锁导致程序完全无响应时,通过Ctrl+C
触发SIGINT
终止进程。
# 三、预防死锁的设计原则
- 按顺序获取锁:所有协程以相同顺序获取多个锁,避免循环等待。
- 避免嵌套锁:减少同一协程获取多个锁的场景。
- 使用超时机制:在获取资源时始终设置超时,如
asyncio.wait_for(lock.acquire(), timeout=5)
。 - 资源分级管理:将资源按优先级编号,协程只能按升序获取锁。
# 总结
死锁退出的核心是打破循环等待链,可通过以下策略实现:
- 超时机制:为资源获取设置时间限制,超时后主动释放已持有资源。
- 动态检测:通过记录锁与任务的依赖关系,定期检查循环等待。
- 强制终止:利用系统信号从外部中断死锁进程。
实际开发中,应优先通过设计避免死锁(如顺序获取锁、超时控制),当死锁发生时,结合超时检测与信号处理实现安全退出。
# 在设计异步协程时,需要考虑的因素有哪些
在设计异步协程时,需要综合考虑性能、可靠性、资源管理和代码可维护性。以下是关键考虑因素及应对策略:
# 一、任务特性分析
区分 I/O 密集型 vs CPU 密集型
- I/O 密集型:适合协程,利用
await
让出控制权,如网络请求、文件读写。 - CPU 密集型:协程无法突破 GIL 限制,需配合
asyncio.to_thread()
或多进程。
import asyncio async def io_bound_task(): await asyncio.sleep(1) # 模拟 I/O 等待 async def cpu_bound_task(): # 错误做法:CPU 密集型操作阻塞事件循环 # result = heavy_computation() # 正确做法:放到线程池执行 loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, heavy_computation) return result
1
2
3
4
5
6
7
8
9
10
11
12
13- I/O 密集型:适合协程,利用
任务间依赖关系
- 串行执行:使用
await
顺序调用协程。 - 并行执行:使用
asyncio.gather()
并发运行,但需注意共享资源竞争。
- 串行执行:使用
# 二、并发控制与资源管理
限制并发数
- 使用
asyncio.Semaphore
控制同时运行的协程数量,避免资源耗尽。
async def worker(semaphore, task_id): async with semaphore: # 限制最多同时运行 10 个协程 print(f"Processing task {task_id}") await asyncio.sleep(1) async def main(): semaphore = asyncio.Semaphore(10) tasks = [worker(semaphore, i) for i in range(100)] await asyncio.gather(*tasks)
1
2
3
4
5
6
7
8
9- 使用
连接池与资源复用
- 数据库连接、HTTP 客户端等昂贵资源应复用,避免频繁创建和销毁。
import aiohttp async def fetch(session, url): async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: # 复用 session tasks = [fetch(session, f"https://api.example.com/{i}") for i in range(10)] await asyncio.gather(*tasks)
1
2
3
4
5
6
7
8
9
10
# 三、异常处理与容错
全局异常捕获
- 使用
asyncio.gather(return_exceptions=True)
捕获所有任务的异常,避免一个失败导致整体崩溃。
async def task(): raise ValueError("Oops!") async def main(): results = await asyncio.gather(task(), return_exceptions=True) for result in results: if isinstance(result, Exception): print(f"Caught exception: {result}")
1
2
3
4
5
6
7
8- 使用
超时控制
- 使用
asyncio.wait_for()
防止协程无限阻塞。
async def slow_task(): await asyncio.sleep(10) async def main(): try: await asyncio.wait_for(slow_task(), timeout=3) except asyncio.TimeoutError: print("Task timed out")
1
2
3
4
5
6
7
8- 使用
重试机制
- 对可能失败的操作实现指数退避重试。
import random async def fetch_with_retry(url, retries=3): for i in range(retries): try: return await fetch(url) except Exception as e: if i == retries - 1: raise wait_time = 2 ** i + random.random() # 指数退避 await asyncio.sleep(wait_time)
1
2
3
4
5
6
7
8
9
10
11
# 四、优雅退出与资源清理
信号处理
- 监听系统信号(如
SIGINT
),主动触发协程取消。
async def main(): task = asyncio.create_task(run_service()) loop = asyncio.get_running_loop() signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) for s in signals: loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown(s, loop, task))) await task async def shutdown(signal, loop, task): print(f"Received {signal.name}, shutting down...") task.cancel() await task loop.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15- 监听系统信号(如
上下文管理器
- 使用
async with
确保资源在协程取消时正确释放。
class DatabaseConnection: async def __aenter__(self): await self.connect() return self async def __aexit__(self, exc_type, exc, tb): await self.disconnect()
1
2
3
4
5
6
7- 使用
# 五、调试与监控
日志与调试工具
- 启用
asyncio
的调试模式:loop.set_debug(True)
- 使用结构化日志记录协程状态:
import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s" ) logger = logging.getLogger(__name__) async def task(): logger.info("Starting task") await asyncio.sleep(1) logger.info("Task completed")
1
2
3
4
5
6
7
8
9
10
11
12- 启用
性能分析
- 使用
asyncio
的current_task()
和all_tasks()
监控运行状态。 - 记录协程执行时间,识别性能瓶颈。
- 使用
# 六、测试策略
单元测试
- 使用
pytest-asyncio
编写异步测试:
import pytest @pytest.mark.asyncio async def test_async_function(): result = await async_function() assert result == expected
1
2
3
4
5
6- 使用
模拟异步依赖
- 使用
unittest.mock
模拟异步函数:
from unittest.mock import MagicMock async def test_fetch(): mock_session = MagicMock() mock_response = MagicMock() mock_response.text.return_value = "data" mock_session.get.return_value.__aenter__.return_value = mock_response result = await fetch(mock_session, "url") assert result == "data"
1
2
3
4
5
6
7
8
9
10- 使用
# 七、与同步代码的交互
线程池执行同步代码
- 使用
asyncio.to_thread()
执行阻塞的同步函数:
async def main(): result = await asyncio.to_thread(sync_function, arg1, arg2)
1
2- 使用
事件循环集成
- 避免在异步代码中直接调用阻塞函数,需将其包装为协程。
# 八、框架选择与生态
- HTTP 服务:选择
FastAPI
、aiohttp
等原生支持异步的框架。 - 数据库:使用
asyncpg
(PostgreSQL)、motor
(MongoDB)等异步驱动。 - 消息队列:通过
aiokafka
、aio_pika
实现异步通信。
# 设计 checklist
- ✅ 是否所有 I/O 操作都使用异步版本?
- ✅ 是否控制了最大并发数,避免资源耗尽?
- ✅ 是否处理了所有可能的异常和超时?
- ✅ 是否实现了优雅退出机制?
- ✅ 是否有足够的监控和日志?
- ✅ 是否考虑了与现有同步代码的兼容性?
合理设计异步协程能显著提升系统吞吐量和响应性,但需谨慎处理并发、资源和错误场景,确保代码健壮且易于维护。
# 总结
asyncio 协程面试题核心围绕事件循环机制、协程生命周期管理、并发控制、异常处理及与其他编程模型的对比。理解这些概念不仅能应对面试,更能在实际开发中高效运用异步编程优化 IO 密集型场景的性能。