别院牧志知识库 别院牧志知识库
首页
  • 基础

    • 全栈之路
    • 😎Awesome资源
  • 进阶

    • Python 工匠系列
    • 高阶知识点
  • 指南教程

    • Socket 编程
    • 异步编程
    • PEP 系列
  • Python 面试题
  • 2025 面试记录
  • 2022 面试记录
  • 2021 面试记录
  • 2020 面试记录
  • 2019 面试记录
  • 数据库索引原理
  • 基金

    • 基金知识
    • 基金经理
  • 细读经典

    • 德隆-三个知道
    • 孔曼子-摊大饼理论
    • 配置者说-躺赢之路
    • 资水-建立自己的投资体系
    • 反脆弱
  • Git 参考手册
  • 提问的智慧
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
首页
  • 基础

    • 全栈之路
    • 😎Awesome资源
  • 进阶

    • Python 工匠系列
    • 高阶知识点
  • 指南教程

    • Socket 编程
    • 异步编程
    • PEP 系列
  • Python 面试题
  • 2025 面试记录
  • 2022 面试记录
  • 2021 面试记录
  • 2020 面试记录
  • 2019 面试记录
  • 数据库索引原理
  • 基金

    • 基金知识
    • 基金经理
  • 细读经典

    • 德隆-三个知道
    • 孔曼子-摊大饼理论
    • 配置者说-躺赢之路
    • 资水-建立自己的投资体系
    • 反脆弱
  • Git 参考手册
  • 提问的智慧
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 辨析

  • Sockets编程

  • Django

  • stackoverflow

  • Flask

  • 全栈之路

  • 面试

  • 代码片段

  • 异步编程

    • 序言
    • 平凡之路——asyncio 的演进
    • 同步/异步与阻塞/非阻塞的区别?
    • Python 线程同步机制:锁、RLocks、信号量、条件和队列
    • Python 对异步 I/O 的优化之路
    • 为什么要采用异步编程?
    • 深入理解 Python 异步编程(上)
    • 深入理解 Python 异步编程(中)
    • 深入理解 Python 异步编程(下)
    • asyncio 协程面试题
      • 1. 什么是异步编程?asyncio 与多线程、多进程的区别是什么
        • 协程调度原理
        • GIL 对协程的影响
      • 2. 解释 async/await 的工作原理,与生成器有什么关系
      • 3. 事件循环(Event Loop)的作用是什么?如何在 asyncio 中获取事件循环
      • 4. Task 和 Future 的区别是什么?如何创建 Task
      • 5. 如何处理异步任务中的异常?未处理的异常会导致什么问题
      • 6. asyncio 中如何实现并发控制?比如限制同时运行的任务数
      • 7. 异步上下文管理器(Async Context Manager)和异步迭代器(Async Iterator)的作用是什么
      • 8. asyncio.sleep()是如何实现非阻塞的?与 time.sleep()的区别是什么
      • 9. 异步编程的优势和适用场景是什么?有哪些局限性
      • 10. 请对比 asyncio.gather()和 asyncio.wait()的用法和区别
      • 11. 如何在 asyncio 中处理超时?请举例说明
      • 12. 简述 asyncio 的底层实现原理(事件循环、IO 多路复用等)
      • 13. 如何在 asyncio 中集成同步代码?有哪些注意事项
      • 多个协程去操作同一数据,保持数据的原子性,是如何实现的?
        • 1. 使用异步锁(`asyncio.Lock`)
        • 2. 原子操作(使用不可中断的操作)
        • 3. 使用队列(`asyncio.Queue`)
        • 4. 不可变数据结构+原子替换
        • 5. 使用线程安全的数据结构
        • 总结
      • 协程调度如何优雅退出?
        • 1. 使用 `asyncio.gather` 的 `return_exceptions=True`
        • 2. 注册信号处理器
        • 3. 使用 `async with` 管理资源
        • 4. 逐级取消任务层次结构
        • 5. 设置超时退出
        • 总结
      • 协程在死锁状态下如何退出?
        • 一、死锁的典型场景与原因
        • 二、死锁退出方案
        • 三、预防死锁的设计原则
        • 总结
      • 在设计异步协程时,需要考虑的因素有哪些
        • 一、任务特性分析
        • 二、并发控制与资源管理
        • 三、异常处理与容错
        • 四、优雅退出与资源清理
        • 五、调试与监控
        • 六、测试策略
        • 七、与同步代码的交互
        • 八、框架选择与生态
        • 设计 checklist
      • 总结
  • 😎Awesome资源

  • PEP

  • Python工匠系列

  • 高阶知识点

  • Python 学习资源待整理
  • 设计模式

  • 好“艹蛋”的 Python 呀!
  • FIFO | 待学清单📝
  • pip 安装及使用
  • 数据分析

  • 源码阅读计划

  • OOP

  • 关于 python 中的 setup.py
  • 并行分布式框架 Celery
  • 七种武器,让你的代码提高可维护性
  • 使用 pdb 调试 Python 代码
  • 每周一个 Python 标准库
  • 🐍Python
  • 异步编程
佚名
2020-11-01
目录

asyncio 协程面试题

# 1. 什么是异步编程?asyncio 与多线程、多进程的区别是什么

回答:
异步编程是一种非阻塞的编程模式,通过事件循环调度任务,允许程序在等待 IO 操作(如网络请求、文件读写)时继续执行其他任务,避免线程阻塞。

  • asyncio 与多线程的区别:

    • 线程模型:多线程通过操作系统内核调度,每个线程有独立栈空间,适合 IO 密集型和 CPU 密集型任务,但存在线程切换开销和锁竞争问题。
    • asyncio 模型:单线程事件循环,通过协程(coroutine)实现任务切换,切换开销极低,适合高并发 IO 场景,但 CPU 密集型任务会阻塞事件循环。
  • asyncio 与多进程的区别:

    • 多进程利用多核 CPU,适合 CPU 密集型任务,但进程间通信复杂,内存占用高。
    • asyncio 单线程模型无法利用多核,但可通过loop.run_in_executor()结合线程池/进程池处理 CPU 任务。

# 协程调度原理

  • 非抢占式执行:协程的执行权转移,要靠显式的await语句。只有当协程主动让出控制权时,事件循环才会去调度其他协程。
  • 事件循环驱动:事件循环就像协程调度的 “指挥官”,它负责管理协程的状态,并且在协程可以继续执行(比如 I/O 操作完成)时恢复其执行。
  • 异步 I/O 绑定:协程主要应用于 I/O 密集型任务。在进行 I/O 操作时,协程会暂停执行,此时事件循环就能去处理其他任务了。

# GIL 对协程的影响

  • 不影响 I/O 密集型任务:因为协程在等待 I/O 操作时,会主动释放控制权,所以 GIL 不会对其产生影响。多个协程可以在同一个线程中高效地处理 I/O 操作。
  • CPU 密集型任务受限:如果协程中包含 CPU 密集型代码,那么 GIL 会导致同一时刻只有一个线程能执行 Python 字节码。不过,这种情况下可以通过asyncio.to_thread()将 CPU 密集型任务放到线程池中执行。
import asyncio

async def task1():
    print("Task 1: Start")
    await asyncio.sleep(1)  # 主动让出控制权
    print("Task 1: End")

async def task2():
    print("Task 2: Start")
    await asyncio.sleep(0.5)  # 主动让出控制权
    print("Task 2: End")

async def main():
    await asyncio.gather(task1(), task2())

asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 2. 解释 async/await 的工作原理,与生成器有什么关系

回答:

  • async def定义的是原生协程(coroutine),await用于挂起协程并等待另一个协程的结果,本质是通过事件循环调度协程的执行。
  • 在 Python 3.4 中,asyncio 最初基于生成器(generator)实现协程(如@asyncio.coroutine和yield from),而 Python 3.5+引入的async/await是原生协程语法,更简洁且性能更好。
  • 原生协程与生成器的区别:
    • 生成器通过yield产出值,通过send()传入值;
    • 原生协程通过await等待结果,只能在async def函数中使用,且不兼容生成器的send()方法。

# 3. 事件循环(Event Loop)的作用是什么?如何在 asyncio 中获取事件循环

回答:
事件循环是 asyncio 的核心,负责调度协程的执行,监听 IO 事件并在事件就绪时恢复协程。

  • 获取事件循环的方式:

    # Python 3.7+推荐方式
    import asyncio
    
    # 获取当前线程的事件循环
    loop = asyncio.get_event_loop()
    
    # 创建新的事件循环(不推荐在生产环境直接使用)
    loop = asyncio.new_event_loop()
    
    # 运行事件循环直到协程完成
    result = loop.run_until_complete(coroutine())
    
    # 关闭事件循环(程序结束前调用)
    loop.close()
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
  • 注意: 在 Windows 系统中,默认事件循环为ProactorEventLoop,而 Unix 系统为SelectorEventLoop,某些 API 可能存在兼容性差异。

# 4. Task 和 Future 的区别是什么?如何创建 Task

回答:

  • Future:表示一个异步操作的最终结果,是一个可等待对象(awaitable),通常由底层操作自动创建。

  • Task:是 Future 的子类,用于包装协程并将其加入事件循环调度,可理解为“正在运行的协程”。

  • 创建 Task 的方式:

    import asyncio
    
    async def my_coroutine():
        await asyncio.sleep(1)
        return "Done"
    
    # 方式1:使用loop.create_task()
    loop = asyncio.get_event_loop()
    task = loop.create_task(my_coroutine())
    
    # 方式2:使用asyncio.create_task()(Python 3.7+)
    task = asyncio.create_task(my_coroutine())
    
    # 等待Task完成
    loop.run_until_complete(task)
    print(task.result())  # 输出 "Done"
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16

# 5. 如何处理异步任务中的异常?未处理的异常会导致什么问题

回答:

  • 异常处理方式:

    1. 使用try/except捕获协程内的异常:

      async def task_with_exception():
          try:
              await asyncio.sleep(1)
              raise ValueError("异常示例")
          except Exception as e:
              print(f"捕获异常: {e}")
      
      1
      2
      3
      4
      5
      6
    2. 对 Task 使用add_done_callback()处理异常:

      def handle_exception(task):
          if task.exception():
              print(f"Task异常: {task.exception()}")
              
      task = asyncio.create_task(task_with_exception())
      task.add_done_callback(handle_exception)
      
      1
      2
      3
      4
      5
      6
    3. 使用asyncio.gather()的return_exceptions参数:

      async def main():
          tasks = [task_with_exception(), another_task()]
          results = await asyncio.gather(*tasks, return_exceptions=True)
          for result in results:
              if isinstance(result, Exception):
                  print(f"处理结果中的异常: {result}")
      
      1
      2
      3
      4
      5
      6
  • 未处理异常的影响: 若协程抛出异常且未被捕获,事件循环会抛出RuntimeError并可能导致程序崩溃,因此必须确保异常被正确处理。

# 6. asyncio 中如何实现并发控制?比如限制同时运行的任务数

回答:

  • 使用信号量(Semaphore):
    信号量可控制并发任务的数量,适合限制对外部资源(如 API 接口、数据库连接)的访问频率。

    import asyncio
    
    async def task_with_semaphore(semaphore, id):
        async with semaphore:
            print(f"任务 {id} 开始执行")
            await asyncio.sleep(1)
            print(f"任务 {id} 执行完成")
    
    async def main():
        # 限制最多3个任务并发
        semaphore = asyncio.Semaphore(3)
        tasks = [task_with_semaphore(semaphore, i) for i in range(5)]
        await asyncio.gather(*tasks)
    
    asyncio.run(main())
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
  • 其他方式:

    • 使用asyncio.Semaphore配合asyncio.wait()手动管理任务队列;
    • 对耗时任务使用loop.run_in_executor()提交到线程池/进程池,避免阻塞事件循环。

# 7. 异步上下文管理器(Async Context Manager)和异步迭代器(Async Iterator)的作用是什么

回答:

  • 异步上下文管理器:
    通过async with语句使用,用于管理异步资源的生命周期(如连接池、文件句柄),需实现__aenter__()和__aexit__()异步方法。

    import asyncio
    
    class AsyncDatabase:
        async def __aenter__(self):
            print("连接数据库")
            return self
            
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            print("关闭数据库连接")
            
        async def query(self):
            await asyncio.sleep(0.1)
            return "查询结果"
    
    async def main():
        async with AsyncDatabase() as db:
            result = await db.query()
            print(result)
    
    asyncio.run(main())
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
  • 异步迭代器:
    通过async for语句迭代异步生成器,需实现__aiter__()和__anext__()方法,适用于流式处理异步数据(如网络数据流)。

    async def async_generator():
        for i in range(3):
            await asyncio.sleep(0.1)
            yield i
    
    async def main():
        async for item in async_generator():
            print(item)
    
    asyncio.run(main())
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

# 8. asyncio.sleep()是如何实现非阻塞的?与 time.sleep()的区别是什么

回答:

  • asyncio.sleep(delay)是一个协程,通过事件循环注册一个定时器事件,在指定延迟后恢复协程执行,不会阻塞事件循环。

  • time.sleep(delay)是阻塞函数,会暂停当前线程,导致事件循环无法调度其他协程,在 asyncio 中使用会造成整个程序阻塞。

  • 示例对比:

    import asyncio
    import time
    
    async def async_sleep():
        print(f"开始异步睡眠: {time.strftime('%X')}")
        await asyncio.sleep(1)  # 非阻塞
        print(f"结束异步睡眠: {time.strftime('%X')}")
    
    def sync_sleep():
        print(f"开始同步睡眠: {time.strftime('%X')}")
        time.sleep(1)  # 阻塞线程
        print(f"结束同步睡眠: {time.strftime('%X')}")
    
    async def main():
        # 异步睡眠不阻塞其他任务
        await asyncio.gather(async_sleep(), async_sleep())
        
        # 同步睡眠会阻塞事件循环
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, sync_sleep)  # 需放入线程池执行
    
    asyncio.run(main())
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22

# 9. 异步编程的优势和适用场景是什么?有哪些局限性

回答:

  • 优势:

    • 高并发处理 IO 密集型任务,如网络请求、数据库操作、文件读写;
    • 单线程模型减少线程切换开销,内存占用低;
    • 代码结构更简洁,避免回调地狱(Callback Hell)。
  • 适用场景:

    • 网络服务(Web 服务器、API 客户端);
    • 实时数据处理(日志收集、消息队列);
    • 分布式系统中的异步通信。
  • 局限性:

    • 不适合 CPU 密集型任务(需配合线程池/进程池);
    • 调试难度较高,异常堆栈可能不完整;
    • 与同步代码集成时需小心处理线程安全问题。

# 10. 请对比 asyncio.gather()和 asyncio.wait()的用法和区别

回答:

特性 asyncio.gather() asyncio.wait()
任务组织方式 接受多个协程/任务作为参数,统一管理 接受任务列表,通过return_when控制返回时机
结果处理 按参数顺序返回结果,异常会直接抛出 返回完成的任务集合,需手动处理结果顺序
异常处理 可通过return_exceptions=True捕获异常 需手动检查每个任务的异常状态
取消任务 调用gather.cancel()取消所有任务 需手动取消未完成的任务
适用场景 简单场景下并行执行任务并获取有序结果 复杂场景下需要精细控制任务状态和顺序

示例对比:

import asyncio

async def task(i):
    await asyncio.sleep(0.1)
    return i

# 使用gather
async def use_gather():
    tasks = [task(i) for i in range(3)]
    results = await asyncio.gather(*tasks)
    print("Gather结果:", results)  # 输出 [0, 1, 2]

# 使用wait
async def use_wait():
    tasks = [task(i) for i in range(3)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
    results = [task.result() for task in done]
    print("Wait结果:", results)  # 输出 [0, 1, 2](顺序可能不同)

asyncio.run(asyncio.gather(use_gather(), use_wait()))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 11. 如何在 asyncio 中处理超时?请举例说明

回答:
使用asyncio.wait_for()函数为协程设置超时时间,超时会抛出asyncio.TimeoutError。

import asyncio

async def slow_operation():
    await asyncio.sleep(2)  # 模拟耗时操作
    return "完成"

async def main():
    try:
        # 设置1秒超时
        result = await asyncio.wait_for(slow_operation(), timeout=1)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时")

asyncio.run(main())  # 输出 "操作超时"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 12. 简述 asyncio 的底层实现原理(事件循环、IO 多路复用等)

回答:

  • 事件循环核心机制:
    事件循环基于 IO 多路复用(如 Unix 的select/poll/epoll,Windows 的IOCP),通过注册文件描述符(socket、pipe 等)的读写事件,当事件就绪时触发回调函数或恢复协程。
  • 协程调度流程:
    1. 协程通过await挂起,将控制权交还给事件循环;
    2. 事件循环继续处理其他就绪的 IO 事件或定时任务;
    3. 当await的目标完成时,事件循环将协程加入就绪队列,等待调度执行。
  • 与其他异步框架的对比:
    • Node.js 使用单线程事件循环+回调函数,asyncio 使用原生协程+await语法更符合同步编程思维;
    • Tornado 基于回调函数,asyncio 的原生协程更易维护。

# 13. 如何在 asyncio 中集成同步代码?有哪些注意事项

回答:
通过loop.run_in_executor()将同步函数提交到线程池或进程池执行,避免阻塞事件循环。

import asyncio
import time

def sync_function():
    time.sleep(1)  # 同步阻塞函数
    return "同步结果"

async def main():
    loop = asyncio.get_event_loop()
    
    # 提交到默认线程池(ThreadPoolExecutor)
    result1 = await loop.run_in_executor(None, sync_function)
    
    # 提交到自定义进程池(适合CPU密集型任务)
    from concurrent.futures import ProcessPoolExecutor
    with ProcessPoolExecutor() as executor:
        result2 = await loop.run_in_executor(executor, sync_function)
    
    print(result1, result2)

asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

注意事项:

  • 线程池适用于 IO 密集型同步任务,进程池适用于 CPU 密集型任务;
  • 跨线程/进程传递数据需注意序列化问题(如使用 pickle);
  • 避免频繁创建线程池/进程池,建议复用实例。

# 多个协程去操作同一数据,保持数据的原子性,是如何实现的?

在Python中,多个协程操作同一数据时,可以通过以下几种方式保证数据的原子性:

# 1. 使用异步锁(asyncio.Lock)

通过锁机制确保同一时间只有一个协程可以访问共享资源。

示例代码:

import asyncio

shared_data = 0
lock = asyncio.Lock()

async def safe_increment():
    global shared_data
    async with lock:  # 同一时间只允许一个协程执行此代码块
        temp = shared_data
        await asyncio.sleep(0.1)  # 模拟耗时操作
        shared_data = temp + 1

async def main():
    tasks = [safe_increment() for _ in range(10)]
    await asyncio.gather(*tasks)
    print(f"Final value: {shared_data}")  # 输出: 10

asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 2. 原子操作(使用不可中断的操作)

如果操作本身是原子的(如单个变量赋值),则无需额外同步。

示例:

shared_counter = 0

async def atomic_update():
    nonlocal shared_counter
    shared_counter += 1  # Python的 += 操作在CPython中通常是原子的
1
2
3
4
5

注意:复杂操作(如a = a + 1)可能涉及多个步骤,并非原子操作,仍需锁保护。

# 3. 使用队列(asyncio.Queue)

通过队列将共享资源的操作串行化,避免竞争条件。

示例:

import asyncio

queue = asyncio.Queue()
shared_data = []

async def producer():
    for i in range(5):
        await queue.put(i)
        await asyncio.sleep(0.1)

async def consumer():
    while True:
        item = await queue.get()
        shared_data.append(item)  # 所有修改通过队列串行化
        queue.task_done()

async def main():
    task1 = asyncio.create_task(producer())
    task2 = asyncio.create_task(consumer())
    await task1
    await queue.join()
    task2.cancel()
    print(shared_data)  # 输出: [0, 1, 2, 3, 4]

asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 4. 不可变数据结构+原子替换

使用不可变数据结构(如tuple、frozenset),通过原子替换保证线程安全。

示例:

shared_state = ()

async def update_state(new_item):
    nonlocal shared_state
    # 创建新的不可变对象并原子替换
    shared_state = (*shared_state, new_item)
1
2
3
4
5
6

# 5. 使用线程安全的数据结构

对于复杂场景,可以使用concurrent.futures中的线程安全容器(需结合asyncio.to_thread)。

示例:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from queue import Queue

thread_safe_queue = Queue()
executor = ThreadPoolExecutor()

async def add_to_queue(item):
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(executor, thread_safe_queue.put, item)
1
2
3
4
5
6
7
8
9
10

# 总结

方法 适用场景 示例代码
异步锁 (asyncio.Lock) 保护临界区代码 async with lock: ...
原子操作 简单变量赋值 x += 1
队列 (asyncio.Queue) 串行化操作 await queue.put(data)
不可变数据结构 频繁替换状态 shared_state = new_state
线程安全容器 复杂共享对象 使用queue.Queue + 线程池

最佳实践:优先使用asyncio.Lock或asyncio.Queue,它们专为协程设计,能避免死锁和竞态条件。

# 协程调度如何优雅退出?

在Python中,协程的优雅退出需要考虑任务取消、资源清理和异常处理。以下是几种常见的优雅退出方法:

# 1. 使用 asyncio.gather 的 return_exceptions=True

捕获所有任务的异常,避免一个任务失败导致整个程序崩溃。

import asyncio

async def task(delay):
    try:
        await asyncio.sleep(delay)
        return f"Task completed after {delay}s"
    except asyncio.CancelledError:
        print(f"Task with delay {delay} was cancelled")
        raise  # 重新抛出异常以保持取消状态

async def main():
    tasks = [task(1), task(10)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    for result in results:
        if isinstance(result, Exception):
            print(f"Caught exception: {result}")
        else:
            print(result)

# 运行主函数
asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 2. 注册信号处理器

监听系统信号(如 SIGINT、SIGTERM),主动触发任务取消。

import asyncio
import signal

async def long_running_task():
    try:
        while True:
            print("Working...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task is cleaning up...")
        await asyncio.sleep(0.5)  # 模拟清理操作
        print("Task cleanup completed")
        raise

async def main():
    task = asyncio.create_task(long_running_task())
    
    # 注册信号处理器
    loop = asyncio.get_running_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop, task))
        )
    
    await task

async def shutdown(signal, loop, task):
    print(f"Received exit signal {signal.name}")
    task.cancel()
    await task
    loop.stop()

# 运行主函数
asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 3. 使用 async with 管理资源

通过 async context manager 确保资源在任务取消时被正确释放。

import asyncio

class DatabaseConnection:
    def __init__(self):
        self.connected = False
    
    async def connect(self):
        print("Connecting to database...")
        await asyncio.sleep(0.5)
        self.connected = True
        print("Connected")
    
    async def disconnect(self):
        print("Disconnecting from database...")
        await asyncio.sleep(0.5)
        self.connected = False
        print("Disconnected")
    
    async def __aenter__(self):
        await self.connect()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.disconnect()

async def worker():
    async with DatabaseConnection() as db:
        try:
            while True:
                print("Processing data...")
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print("Worker is being cancelled")
            raise

# 运行主函数
asyncio.run(worker())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 4. 逐级取消任务层次结构

对于复杂的任务树,需要从顶层到底层逐级取消,确保每个子任务都有机会清理资源。

import asyncio

async def child_task(name):
    try:
        print(f"{name} started")
        await asyncio.sleep(10)
        print(f"{name} completed")
    except asyncio.CancelledError:
        print(f"{name} is being cancelled")
        raise

async def parent_task():
    child1 = asyncio.create_task(child_task("Child 1"))
    child2 = asyncio.create_task(child_task("Child 2"))
    
    try:
        await asyncio.gather(child1, child2)
    except asyncio.CancelledError:
        print("Parent task is being cancelled")
        child1.cancel()
        child2.cancel()
        await asyncio.gather(child1, child2, return_exceptions=True)
        raise

# 运行主函数
asyncio.run(parent_task())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 5. 设置超时退出

使用 asyncio.wait_for 或 asyncio.wait 设置任务最大执行时间。

import asyncio

async def slow_task():
    await asyncio.sleep(10)
    return "Done"

async def main():
    try:
        # 设置任务超时时间为3秒
        result = await asyncio.wait_for(slow_task(), timeout=3)
        print(result)
    except asyncio.TimeoutError:
        print("Task timed out")

# 运行主函数
asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 总结

  • 捕获异常:使用 return_exceptions=True 避免任务间的异常传播。
  • 信号处理:监听系统信号,主动触发取消逻辑。
  • 资源管理:通过 async with 确保资源自动释放。
  • 层次取消:对于复杂任务,从顶层逐级取消子任务。
  • 超时控制:使用 wait_for 设置任务最大执行时间。

优雅退出的核心是给每个协程处理取消请求的机会,避免资源泄漏和数据不一致。

# 协程在死锁状态下如何退出?

在Python协程中,死锁通常发生在多个协程互相等待对方释放资源(如锁、信号量),导致所有协程无法继续执行的情况。解决死锁需要打破其必要条件(互斥、请求与保持、不剥夺、循环等待),以下是几种针对性的退出方案:

# 一、死锁的典型场景与原因

import asyncio

async def deadlock_demo():
    lock1 = asyncio.Lock()
    lock2 = asyncio.Lock()
    
    async def task1():
        async with lock1:
            await asyncio.sleep(0.1)
            async with lock2:  # 等待lock2,此时task2在等待lock1
                print("Task1 done")
    
    async def task2():
        async with lock2:
            await asyncio.sleep(0.1)
            async with lock1:  # 等待lock1,此时task1在等待lock2
                print("Task2 done")
    
    # 同时运行两个任务,导致死锁
    await asyncio.gather(task1(), task2())

# 运行此代码会导致死锁,程序无响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

死锁原因:

  • 任务1获取lock1后等待lock2,任务2获取lock2后等待lock1,形成循环等待。

# 二、死锁退出方案

# 1. 为资源获取设置超时(破坏“不剥夺”条件)

通过asyncio.wait_for为获取锁或执行任务设置超时,超时后主动抛出异常打破死锁。

import asyncio

async def deadlock_with_timeout():
    lock1 = asyncio.Lock()
    lock2 = asyncio.Lock()
    
    async def task1():
        try:
            # 为获取lock2设置超时
            async with asyncio.wait_for(lock2.acquire(), timeout=1):
                print("Task1 acquired lock2")
        except asyncio.TimeoutError:
            print("Task1 timeout, releasing lock1")
            lock1.release()  # 主动释放已获取的锁
            raise
    
    async def task2():
        try:
            # 为获取lock1设置超时
            async with asyncio.wait_for(lock1.acquire(), timeout=1):
                print("Task2 acquired lock1")
        except asyncio.TimeoutError:
            print("Task2 timeout, releasing lock2")
            lock2.release()
            raise
    
    # 先获取锁,模拟死锁前的状态
    await lock1.acquire()
    await lock2.acquire()
    lock1.release()  # 释放锁以触发死锁场景
    lock2.release()
    
    try:
        await asyncio.gather(task1(), task2())
    except asyncio.TimeoutError:
        print("Deadlock detected and broken by timeout")

asyncio.run(deadlock_with_timeout())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 2. 周期性检测死锁状态(主动破坏循环等待)

通过记录锁的占用状态,定期检查是否存在循环等待链。

import asyncio
from collections import deque

class DeadlockDetector:
    def __init__(self):
        self.lock_owners = {}  # 记录锁的当前持有者
        self.waiting_tasks = {}  # 记录等待锁的任务
    
    def register_lock(self, lock, task):
        self.lock_owners[lock] = task
    
    def register_wait(self, lock, task):
        self.waiting_tasks[task] = lock
    
    def detect_cycle(self):
        # 简化版:检查是否存在任务等待链
        visited = set()
        for task in self.waiting_tasks:
            if task in visited:
                continue
            path = set()
            current = task
            while current in self.waiting_tasks:
                if current in path:
                    # 发现循环等待
                    return True
                path.add(current)
                current = self.lock_owners.get(self.waiting_tasks[current])
            visited.update(path)
        return False

async def deadlock_detection():
    detector = DeadlockDetector()
    lock1 = asyncio.Lock()
    lock2 = asyncio.Lock()
    
    async def task1():
        await lock1.acquire()
        detector.register_lock(lock1, task1)
        try:
            # 模拟等待lock2
            detector.register_wait(lock2, task1)
            await asyncio.sleep(0.1)  # 等待lock2被task2获取
        finally:
            lock1.release()
    
    async def task2():
        await lock2.acquire()
        detector.register_lock(lock2, task2)
        try:
            # 模拟等待lock1
            detector.register_wait(lock1, task2)
            await asyncio.sleep(0.1)  # 等待lock1被task1获取
        finally:
            lock2.release()
    
    # 启动检测协程,定期检查死锁
    async def detection_loop():
        while True:
            await asyncio.sleep(0.5)
            if detector.detect_cycle():
                print("Deadlock detected! Forcing exit...")
                # 强制取消所有任务
                for task in [task1, task2]:
                    task.cancel()
                break
    
    detect_task = asyncio.create_task(detection_loop())
    tasks = [task1(), task2()]
    
    try:
        await asyncio.gather(*tasks, detect_task)
    except asyncio.CancelledError:
        print("Tasks cancelled due to deadlock")

asyncio.run(deadlock_detection())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

# 3. 使用信号强制终止(终极方案)

当其他方案无效时,通过系统信号(如SIGINT)强制终止程序,适用于长时间无响应的死锁。

import asyncio
import signal
import sys

async def deadlock_task():
    lock = asyncio.Lock()
    async with lock:
        print("Task acquired lock")
        await asyncio.sleep(10)  # 模拟死锁等待
        async with lock:  # 再次获取同一锁,导致死锁
            print("This line will never execute")

async def main():
    task = asyncio.create_task(deadlock_task())
    
    # 注册信号处理器
    def signal_handler(sig, frame):
        print(f"Received signal {sig}, forcing exit")
        task.cancel()
        sys.exit(0)
    
    for sig in (signal.SIGINT, signal.SIGTERM):
        signal.signal(sig, signal_handler)
    
    try:
        await task
    except asyncio.CancelledError:
        print("Task cancelled, exiting")

asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

使用场景:当死锁导致程序完全无响应时,通过Ctrl+C触发SIGINT终止进程。

# 三、预防死锁的设计原则

  1. 按顺序获取锁:所有协程以相同顺序获取多个锁,避免循环等待。
  2. 避免嵌套锁:减少同一协程获取多个锁的场景。
  3. 使用超时机制:在获取资源时始终设置超时,如asyncio.wait_for(lock.acquire(), timeout=5)。
  4. 资源分级管理:将资源按优先级编号,协程只能按升序获取锁。

# 总结

死锁退出的核心是打破循环等待链,可通过以下策略实现:

  • 超时机制:为资源获取设置时间限制,超时后主动释放已持有资源。
  • 动态检测:通过记录锁与任务的依赖关系,定期检查循环等待。
  • 强制终止:利用系统信号从外部中断死锁进程。

实际开发中,应优先通过设计避免死锁(如顺序获取锁、超时控制),当死锁发生时,结合超时检测与信号处理实现安全退出。

# 在设计异步协程时,需要考虑的因素有哪些

在设计异步协程时,需要综合考虑性能、可靠性、资源管理和代码可维护性。以下是关键考虑因素及应对策略:

# 一、任务特性分析

  1. 区分 I/O 密集型 vs CPU 密集型

    • I/O 密集型:适合协程,利用 await 让出控制权,如网络请求、文件读写。
    • CPU 密集型:协程无法突破 GIL 限制,需配合 asyncio.to_thread() 或多进程。
    import asyncio
    
    async def io_bound_task():
        await asyncio.sleep(1)  # 模拟 I/O 等待
    
    async def cpu_bound_task():
        # 错误做法:CPU 密集型操作阻塞事件循环
        # result = heavy_computation()
        
        # 正确做法:放到线程池执行
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(None, heavy_computation)
        return result
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
  2. 任务间依赖关系

    • 串行执行:使用 await 顺序调用协程。
    • 并行执行:使用 asyncio.gather() 并发运行,但需注意共享资源竞争。

# 二、并发控制与资源管理

  1. 限制并发数

    • 使用 asyncio.Semaphore 控制同时运行的协程数量,避免资源耗尽。
    async def worker(semaphore, task_id):
        async with semaphore:  # 限制最多同时运行 10 个协程
            print(f"Processing task {task_id}")
            await asyncio.sleep(1)
    
    async def main():
        semaphore = asyncio.Semaphore(10)
        tasks = [worker(semaphore, i) for i in range(100)]
        await asyncio.gather(*tasks)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
  2. 连接池与资源复用

    • 数据库连接、HTTP 客户端等昂贵资源应复用,避免频繁创建和销毁。
    import aiohttp
    
    async def fetch(session, url):
        async with session.get(url) as response:
            return await response.text()
    
    async def main():
        async with aiohttp.ClientSession() as session:  # 复用 session
            tasks = [fetch(session, f"https://api.example.com/{i}") for i in range(10)]
            await asyncio.gather(*tasks)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

# 三、异常处理与容错

  1. 全局异常捕获

    • 使用 asyncio.gather(return_exceptions=True) 捕获所有任务的异常,避免一个失败导致整体崩溃。
    async def task():
        raise ValueError("Oops!")
    
    async def main():
        results = await asyncio.gather(task(), return_exceptions=True)
        for result in results:
            if isinstance(result, Exception):
                print(f"Caught exception: {result}")
    
    1
    2
    3
    4
    5
    6
    7
    8
  2. 超时控制

    • 使用 asyncio.wait_for() 防止协程无限阻塞。
    async def slow_task():
        await asyncio.sleep(10)
    
    async def main():
        try:
            await asyncio.wait_for(slow_task(), timeout=3)
        except asyncio.TimeoutError:
            print("Task timed out")
    
    1
    2
    3
    4
    5
    6
    7
    8
  3. 重试机制

    • 对可能失败的操作实现指数退避重试。
    import random
    
    async def fetch_with_retry(url, retries=3):
        for i in range(retries):
            try:
                return await fetch(url)
            except Exception as e:
                if i == retries - 1:
                    raise
                wait_time = 2 ** i + random.random()  # 指数退避
                await asyncio.sleep(wait_time)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

# 四、优雅退出与资源清理

  1. 信号处理

    • 监听系统信号(如 SIGINT),主动触发协程取消。
    async def main():
        task = asyncio.create_task(run_service())
        
        loop = asyncio.get_running_loop()
        signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
        for s in signals:
            loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown(s, loop, task)))
        
        await task
    
    async def shutdown(signal, loop, task):
        print(f"Received {signal.name}, shutting down...")
        task.cancel()
        await task
        loop.stop()
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
  2. 上下文管理器

    • 使用 async with 确保资源在协程取消时正确释放。
    class DatabaseConnection:
        async def __aenter__(self):
            await self.connect()
            return self
        
        async def __aexit__(self, exc_type, exc, tb):
            await self.disconnect()
    
    1
    2
    3
    4
    5
    6
    7

# 五、调试与监控

  1. 日志与调试工具

    • 启用 asyncio 的调试模式:loop.set_debug(True)
    • 使用结构化日志记录协程状态:
    import logging
    
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(name)s %(message)s"
    )
    logger = logging.getLogger(__name__)
    
    async def task():
        logger.info("Starting task")
        await asyncio.sleep(1)
        logger.info("Task completed")
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
  2. 性能分析

    • 使用 asyncio 的 current_task() 和 all_tasks() 监控运行状态。
    • 记录协程执行时间,识别性能瓶颈。

# 六、测试策略

  1. 单元测试

    • 使用 pytest-asyncio 编写异步测试:
    import pytest
    
    @pytest.mark.asyncio
    async def test_async_function():
        result = await async_function()
        assert result == expected
    
    1
    2
    3
    4
    5
    6
  2. 模拟异步依赖

    • 使用 unittest.mock 模拟异步函数:
    from unittest.mock import MagicMock
    
    async def test_fetch():
        mock_session = MagicMock()
        mock_response = MagicMock()
        mock_response.text.return_value = "data"
        mock_session.get.return_value.__aenter__.return_value = mock_response
        
        result = await fetch(mock_session, "url")
        assert result == "data"
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

# 七、与同步代码的交互

  1. 线程池执行同步代码

    • 使用 asyncio.to_thread() 执行阻塞的同步函数:
    async def main():
        result = await asyncio.to_thread(sync_function, arg1, arg2)
    
    1
    2
  2. 事件循环集成

    • 避免在异步代码中直接调用阻塞函数,需将其包装为协程。

# 八、框架选择与生态

  1. HTTP 服务:选择 FastAPI、aiohttp 等原生支持异步的框架。
  2. 数据库:使用 asyncpg(PostgreSQL)、motor(MongoDB)等异步驱动。
  3. 消息队列:通过 aiokafka、aio_pika 实现异步通信。

# 设计 checklist

  1. ✅ 是否所有 I/O 操作都使用异步版本?
  2. ✅ 是否控制了最大并发数,避免资源耗尽?
  3. ✅ 是否处理了所有可能的异常和超时?
  4. ✅ 是否实现了优雅退出机制?
  5. ✅ 是否有足够的监控和日志?
  6. ✅ 是否考虑了与现有同步代码的兼容性?

合理设计异步协程能显著提升系统吞吐量和响应性,但需谨慎处理并发、资源和错误场景,确保代码健壮且易于维护。

# 总结

asyncio 协程面试题核心围绕事件循环机制、协程生命周期管理、并发控制、异常处理及与其他编程模型的对比。理解这些概念不仅能应对面试,更能在实际开发中高效运用异步编程优化 IO 密集型场景的性能。

编辑 (opens new window)
#异步#asyncio#协程#面试
上次更新: 2025-06-26, 03:39:47
深入理解 Python 异步编程(下)
Python 资源大全中文版

← 深入理解 Python 异步编程(下) Python 资源大全中文版→

最近更新
01
本事与投资
06-20
02
Flask 运行周期及工作原理
06-05
03
支付系统策略模式实现
06-04
更多文章>
Theme by Vdoing | Copyright © 2019-2025 IMOYAO | 别院牧志
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式