[译] PEP 525--异步生成器
PEP: | 525 |
---|---|
Title: | Asynchronous Generators |
Author: | Yury Selivanov <yury at edgedb.com> |
Discussions-To: | <python-dev at python.org> |
Status: | Final |
Type: | Standards Track |
Created: | 28-Jul-2016 |
Python-Version: | 3.6 |
Post-History: | 02-Aug-2016, 23-Aug-2016, 01-Sep-2016, 06-Sep-2016 |
译者 :CXA (opens new window)(python 学习开发 公众号作者)
# 简述
PEP492 引入了对 Python 3.5 的原生协程和 async/await 句法的支持。本次提案添加了对异步生成器的支持进而来扩展 Python 的异步功能。
# 理论和目标
常规生成器(在 PEP 255 中引入)的实现,使得编写复杂数据变得更优雅,它们的行为类似于迭代器。 当时没有提供 async for 使用的异步生成器。 编写异步数据生成器变得非常复杂,因为必须定义一个实现__aiter__和__anext__的方法,才能在 async for 语句中使用它。 为了说明异步生成器的重要性,专门做了性能测试,测试结果表明使用异步生成器要比使用异步迭代器快 2 倍多。 下面的代码是演示了在迭代的过程中等待几秒
class Ticker:
"""Yield numbers from 0 to `to` every `delay` seconds."""
def __init__(self, delay, to):
self.delay = delay
self.i = 0
self.to = to
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
if i >= self.to:
raise StopAsyncIteration
self.i += 1
if i:
await asyncio.sleep(self.delay)
return i
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
我们那可以使用下面的代码实现同样的功能:
async def ticker(delay, to):
"""Yield numbers from 0 to `to` every `delay` seconds."""
for i in range(to):
yield i
await asyncio.sleep(delay)
2
3
4
5
# 详细说明
# 异步生成器
我们直到在函数中使用一个或多个 yield 该函数将变成一个生成器。
def func(): # 方法
return
def genfunc(): # 生成器方法
yield
2
3
4
5
我们提议使用类似的功能实现下面异步生成器:
async def coro(): # 一个协程方法
await smth()
async def asyncgen(): # 一个异步生成器方法
await smth()
yield 42
2
3
4
5
6
调用异步生成器函数的结果是异步生成器对象,它实现了 PEP 492 中定义的异步迭代协议。 注意:在异步生成器中使用非空 return 语句会引发 SyntaxError 错误。
# 对异步迭代协议的支持
该协议需要实现两种特殊方法: __aiter__方法返回一个异步迭代器。 __anext__方法返回一个 awaitable 对象,它使用 StopIteration 异常来捕获 yield 的值,使用 StopAsyncIteration 异常来表示迭代结束。 异步生成器定义了这两种方法。 让我们实现一个一个简单的异步生成器:
import asyncio
async def genfunc():
yield 1
yield 2
gen = genfunc()
async def start():
assert gen.__aiter__() is gen
assert await gen.__anext__() == 1
assert await gen.__anext__() == 2
await gen.__anext__() # This line will raise StopAsyncIteration.
if __name__ == '__main__':
asyncio.run(start())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 终止
PEP 492 提到需要使用事件循环或调度程序来运行协程。 因为异步生成器是在协程使用的,所以还需要创建一个事件循环来运行。 异步生成器可以有 try..finally 块,也可以用 async with 异步上下文管理代码快。 重要的是提供一种保证,即使在部分迭代时,也可以进行垃圾收集,生成器可以安全终止。
async def square_series(con, to):
async with con.transaction():
cursor = con.cursor(
'SELECT generate_series(0, $1) AS i', to)
async for row in cursor:
yield row['i'] ** 2
async for i in square_series(con, 1000):
if i == 100:
break
2
3
4
5
6
7
8
9
10
上面代码演示了异步生成器在 async with 中使用,然后使用 async for 对异步生成器对象进行迭代处理,同时我们也可以设置一个中断条件。 square_series()生成器将被垃圾收集,并没有异步关闭生成器的机制,Python 解释器将无法执行任何操作。 为了解决这个问题,这里提出以下改进建议: 1.在异步生成器上实现一个 aclose 方法,返回一个特殊 awaittable 对象。 当 awaitable 抛出 GeneratorExit 异常的时候,抛出到挂起的生成器中并对其进行迭代,直到发生 GeneratorExit 或 StopAsyncIteration。这就是在常规函数中使用 close 方法关闭对象一样,只不过 aclose 需要一个事件循环去执行。 2.不要在异步生成器中使用 yield 语句,只能用 await。 3.在 sys 模块中加两个方法:set_asyncgen_hooks() and get_asyncgen_hooks(). sys.set_asyncgen_hooks()背后的思想是允许事件循环拦截异步生成器的迭代和终结,这样最终用户就不需要关心终结问题了,一切正常。 sys.set_asyncgen_hooks() 可以结束两个参数 firstiter:一个可调用的,当第一次迭代异步生成器时将调用它。 finalizer:一个可调用的,当异步生成器即将被 GC 时将被调用。 当第一迭代异步生成器时,它会引用到当前的 finalizer。 当异步生成器即将被垃圾收集时,它会调用其缓存的 finalizer。假想在事件循环激活异步生成器开始迭代的时候,finalizer 将调用一个 aclose()方法. 例如,以下是如何修改 asyncio 以允许安全地完成异步生成器:
# asyncio/base_events.py
class BaseEventLoop:
def run_forever(self):
...
old_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen)
try:
...
finally:
sys.set_asyncgen_hooks(*old_hooks)
...
def _finalize_asyncgen(self, gen):
self.create_task(gen.aclose())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
第二个参数 firstiter,允许事件循环维护在其控制下实例化的弱异步生成器集。这使得可以实现“shutdown”机制,来安全地打开的生成器并关闭事件循环。 sys.set_asyncgen_hooks()是特定线程,因此在多个事件循环并行的时候是安全的。 sys.get_asyncgen_hooks()返回一个带有 firstiter 和 finalizer 字段的类似于类的结构。
# asyncio
asyncio 事件循环将使用 sys.set_asyncgen_hooks()API 来维护所有被调度的弱异步生成器,并在生成器被垃圾回收时侯调度它们的 aclose()方法。 为了确保 asyncio 程序可以可靠地完成所有被调度的异步生成器,我们建议添加一个新的事件循环协程方法 loop.shutdown_asyncgens()。 该方法将使用 aclose()调用关闭所有当前打开的异步生成器。 在调用 loop.shutdown_asyncgens()方法之后,首次迭代新的异步生成器,事件循环就会发出警告。 我们的想法是,在请求关闭所有异步生成器之后,程序不应该执行迭代新异步生成器的代码。 下面是一个关于如何使用 Ashutdown_asyncgens 的例子:
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())#关闭所有异步迭代器
loop.close()
2
3
4
5
# 异步生成器对象
该对象以标准 Python 生成器对象为模型。 本质上异步生成器的行为复制了同步生成器的行为,唯一的区别在于 API 是异步的。 定义了以下方法和属性: 1.agen._aiter_(): 返回 agen. 2.agen._anext_(): 返回一个 awaitable 对象, 调用一次异步生成器的元素。 3.agen.asend(val): 返回一个 awaitable 对象,它在 agen 生成器中推送 val 对象。 当 agen 还没迭代时,val 必须为 None。 上面的方法类似同步生成器的使用。 代码例子:
import asyncio
async def gen():
await asyncio.sleep(0.1)
v = yield 42
print(v)
await asyncio.sleep(0.2)
async def start():
g = gen()
await g.asend(None) # Will return 42 after sleeping
# for 0.1 seconds.
await g.asend('hello') # Will print 'hello' and
# raise StopAsyncIteration
# (after sleeping for 0.2 seconds.)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
4.agen.athrow(typ, [val, [tb]]): 返回一个 awaitable 对象, 这会向 agen 生成器抛出一个异常。 代码如下:
import asyncio
async def gen():
try:
await asyncio.sleep(0.1)
yield 'hello'
except IndexError:
await asyncio.sleep(0.2)
yield 'world'
async def start():
g = gen()
v = await g.asend(None)
print(v) # Will print 'hello' after
# sleeping for 0.1 seconds.
v = await g.athrow(IndexError)
print(v) # Will print 'world' after
# $ sleeping 0.2 seconds.
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
5.agen.aclose(): 返回一个 awaitable 对象, 调用该方法会抛出一个异常给生成器。
import asyncio
async def gen():
try:
await asyncio.sleep(0.1)
v = yield 42
print(v)
await asyncio.sleep(0.2)
except:
print("运行结束")
async def start():
g = gen()
v=await g.asend(None)
print(v)
await g.aclose() #不做异常处理会报错
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
6.agen.__name__
和 agen.__qualname__
:可以返回异步生成器函数的名字。
async def gen():
try:
await asyncio.sleep(0.1)
v = yield 42
print(v)
await asyncio.sleep(0.2)
except:
print("运行结束")
async def start():
g = gen()
print(g.__aiter__())#输出async_generator对象
print(g.__name__)#输出gen
print(g.__qualname__)#输出gen
2
3
4
5
6
7
8
9
10
11
12
13
14
agen.ag_await
:agen
当前正在等待的对象,或None
。 这类似于当前可用于生成器的gi_yieldfrom
和可用于协程的cr_await
。agen.ag_frame
,agen.ag_running
和agen.ag_code
:以与标准生成器的相似属性相同的方式定义。
StopIteration
和 StopAsyncIteration
被替换为 RuntimeError
,并且不上抛。
# 源码实现细节
异步生成器对象(PyAsyncGenObject)与 PyGenObject 共享结构布局。 除此之外,参考实现还引入了三个新对象: PyAsyncGenASend:实现_anext__和 asend()方法的等待对象。 PyAsyncGenAThrow:实现 athrow()和 aclose()方法的等待对象。 PyAsyncGenWrappedValue:来自异步生成器的每个直接生成的对象都隐式地装入此结构中。 这就是生成器实现如何使用常规迭代协议从使用异步迭代协议生成的对象中分离出的对象。 PyAsyncGenASend 和 PyAsyncGenAThrow 是 awaitable 对象(它们有_await__方法返回 self)类似于 coroutine 的对象(实现_iter,_ next_,send()和 throw()方法)。 本质上,它们控制异步生成器的迭代方式
# PyAsyncGenASend 和 PyAsyncGenAThrow
PyAsyncGenASend 类似生成器对象驱动 __anext__
和 asend()
方法,实装了异步迭代协议。
agen.asend(val)
和 agen.__anext__()
返回一个 PyAsyncGenASend 对象的一个引用。 (它将引用保存回父类 agen 对象。)
数据流定义如下:
1.首次调用 PyAsyncGenASend.send(val)时, val 将推入到父类 agen 对象 (PyGenObject 利用现有对象。)
对 PyAsyncGenASend 对象进行后续迭代,将 None 推送到 agen。
2.首次调用_PyAsyncGenWrappedValue 对象时,它将被拆箱,并且以未被装饰的值作为参数会引发 StopIteration 异常。
3.异步生成器中的 return 语句引发 StopAsyncIteration 异常,该异常通过 PyAsyncGenASend.send()和 PyAsyncGenASend.throw()方法传播。
4.PyAsyncGenAThrow 与 PyAsyncGenASend 非常相似。 唯一的区别是 PyAsyncGenAThrow.send()在第一次调用时会向父类 agen 对象抛出异常(而不是将值推入其中。)
# 新的标准库方法和 Types
1.types.AsyncGeneratorType -- 判断是否是异步生成器对象 2.sys.set_asyncgen_hooks()和 sys.get_asyncgen_hooks()-- 在事件循环中设置异步生成器终结器和迭代拦截器。 3.inspect.isasyncgen()和 inspect.isasyncgenfunction() :方法内省。 4.asyncio 加入新方法:loop.shutdown_asyncgens(). 5.collections.abc.AsyncGenerator:抽象基类的添加。
# 是否支持向后兼容
该提案完全支持向后兼容 在 python3.5,async def 里使用 yield 会报错,因此在 python3.6 引入了安全的异步生成器
# 性能展示
# 常规生成器
def gen():
i = 0
while i < 100000000:
yield i
i += 1
if __name__ == '__main__':
list(gen())
2
3
4
5
6
7
8
# 异步迭代器的改进
# 异步迭代器
异步迭代器需求通过__aiter__和__anext__方法自己实现。
import time
import asyncio
N = 10 ** 7
class AIter:
def __init__(self):
self.i = 0
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
if i >= N:
raise StopAsyncIteration
self.i += 1
return i
async def start():
[_ async for _ in AIter()]
if __name__ == '__main__':
s=time.time()
loop=asyncio.get_event_loop()
try:
loop.run_until_complete(start())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
e=time.time()
print("total time",e-s)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
输出
total time 5.441649913787842
# 异步生成器
import time
import asyncio
N = 10 ** 7
async def agen():
for i in range(N):
yield i
async def start():
[_ async for _ in agen()]
if __name__ == '__main__':
s=time.time()
loop=asyncio.get_event_loop()
try:
loop.run_until_complete(start())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
e=time.time()
print("total time",e-s)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
输出
total time 2.1055827140808105
基准测试表明异步生成器的速度比异步迭代器快了两倍多。
# 设计中要注意的事项
内建函数:aiter() and anext() 最初,PEP 492 将__aiter__定义为应返回等待对象的方法,从而产生异步迭代器。 但是,在 CPython 3.5.2 中,重新定义了__aiter__可以直接返回异步迭代器。 为了避免破坏向后兼容性,决定 Python 3.6 将支持两种方式:__aiter__仍然可以在发出 DeprecationWarning 时返回等待状态。由于 Python 3.6 中__aiter__的这种双重性质,我们无法添加内置的 aiter()的同步实现。 因此,建议等到 Python 3.7。
# 异步 list/dict/set 推导式
将放在单独的 pep 中也就是后来的 pep530.
# 异步 yield from
对于异步生成器,yield from 也不那么重要,因为不需要提供在协程之上实现另一个协同程序协议的机制。为了组合异步生成器,可以使用 async for 简化这个过程:
async def g1():
yield 1
yield 2
async def g2():
async for v in g1():
yield v
2
3
4
5
6
7
# 为了 asend()和 athrow()是必须的
它们可以使用异步生成器实现类似于 contextlib.contextmanager 的概念。 例如,可以实现以下模式:
@async_context_manager
async def ctx():
await open()
try:
yield
finally:
await close()
async with ctx():
await ...
2
3
4
5
6
7
8
9
10
另一个原因是从__anext__对象返回的对象来推送数据并将异常抛出到异步生成器中,很难正确地执行此操作。 添加显式的 asend()和 athrow()更获取异常后的数据。 在实现方面,asend()是__anext__更通用的版本,而 athrow()与 aclose()非常相似。 因此,为异步生成器定义这些方法不会增加任何额外的复杂性。
# 代码示例
async def ticker(delay, to):
for i in range(to):
yield i
await asyncio.sleep(delay)
async def run():
async for i in ticker(1, 10):
print(i)
import asyncio
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run())
finally:
loop.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
这代码将打出 0-9,每个数字之间的间隔为 1s。
# 提议者
Guido, 2016 年 9 月 6 日
# 参考资料
[1] https://github.com/1st1/cpython/tree/async_gen [2] https://mail.python.org/pipermail/python-dev/2016-September/146267.html [3] http://bugs.python.org/issue28003
# 版权声明
本文章翻译整理自,pep525 Source: https://github.com/python/peps/blob/master/pep-0525.txt 翻译能力有限还请大家多多指教。