平凡之路——asyncio 的演进
如今,地球上最发达、规模最庞大的计算机程序,莫过于因特网。而从 CPU 的时间观中可知,网络 I/O 是最大的 I/O 瓶颈,除了宕机没有比它更慢的。所以,诸多异步框架都对准的是网络 I/O。
我们从一个爬虫例子说起,从因特网上下载 10 篇网页。
# 同步阻塞方式
最容易想到的解决方案就是依次下载,从建立 socket 连接到发送网络请求再到读取响应数据,顺序进行。
#!/usr/bin/env python
# encoding: utf-8
import socket
import time
from concurrent import futures
def blocking_way():
"""
建立 socket 连接,发送HTTP请求,然后从 socket读取HTTP响应并返回数据。示例中我们请求 example.com 的首页。
"""
sock = socket.socket()
# 以blocking的方式向指定网址80端口发送网络连接请求
sock.connect(('example.com', 80))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
# 从socket上读取4K字节数据
chunk = sock.recv(4096)
while chunk:
response += chunk
# blocking
chunk = sock.recv(4096)
return response
def sync_way():
"""
执行10次,返回结果
"""
res = []
for _ in range(10):
res.append(blocking_way())
return len(res)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
注:总体耗时约为 4.5 秒。(因网络波动每次测试结果有所变动,本文取多次平均值)
我们知道,创建网络连接,多久能创建完成不是客户端决定的,而是由网络状况和服务端处理能力共同决定。服务端什么时候返回了响应数据并被客户端接收到可供程序读取,也是不可预测的。所以 sock.connect()和 sock.recv()这两个调用在默认情况下是阻塞的。
注:
sock.send()
函数并不会阻塞太久,它只负责将请求数据拷贝到 TCP/IP 协议栈的系统缓冲区中就返回,并不等待服务端返回的应答确认。
假设网络环境很差,创建网络连接需要 1 秒钟,那么sock.connect()
就得阻塞 1 秒钟,等待网络连接成功。这 1 秒钟对一颗 2.6GHz 的 CPU 来讲,仿佛过去了 83 年,然而它不能干任何事情。sock.recv()
也是一样的,必须得等到服务端的响应数据已经被客户端接收。我们下载 10 篇网页,这个阻塞过程就得重复 10 次。如果一个爬虫系统每天要下载 1000 万篇网页呢?!
上面说了这么多,我们力图引出一个问题:同步阻塞的网络交互方式,效率低十分低下。 特别是在网络交互频繁的程序中。这种方式根本 不可能 挑战 C10K/C10M。
# 改进方式:多进程
在一个程序内,依次执行 10 次太耗时,那开 10 个一样的程序同时执行不就行了。于是我们想到了多进程编程。为什么会先想到多进程呢?发展脉络如此。在更早的操作系统(Linux 2.4)及其以前,进程是 OS 调度任务的实体,是面向进程设计的 OS。
def process_way():
"""
进程池方式
:return:
"""
workers = 10
with futures.ProcessPoolExecutor(workers) as executor:
futs = {executor.submit(blocking_way) for _ in range(try_count)}
return len([fut.result() for fut in futs])
2
3
4
5
6
7
8
9
注:总体耗时约为 0.6 秒。
改善效果立竿见影。但仍然有问题。总体耗时并没有缩减到原来的十分之一,而是九分之一左右,还有一些时间耗到哪里去了——进程切换开销。
# count = 10
(10, 'sync_way', 0.3614327907562256)
(10, 'thread_way', 0.05895590782165527)
(10, 'process_way', 0.6955974102020264)
# count = 80
(80, 'sync_way', 5.538213014602661)
(80, 'thread_way', 1.2612965106964111)
(80, 'process_way', 1.0214107036590576)
2
3
4
5
6
7
8
注意
经本人实测,当只尝试 10 次连接时,进程池并没有达到所说的缩短时间的目的。当连接请求次数够多时,我们才能看到进程池的效果符合预期。
进程切换开销不止像“CPU 的时间观”所列的“上下文切换”那么低。CPU 从一个进程切换到另一个进程,需要把旧进程运行时的寄存器状态、内存状态全部保存好,再将另一个进程之前保存的数据恢复。对 CPU 来讲,几个小时就干等着。当进程数量大于 CPU 核心数量时,进程切换是必然需要的。
除了切换开销,多进程还有另外的缺点。一般的服务器在能够稳定运行的前提下,可以同时处理的进程数在数十个到数百个规模。如果进程数量规模更大,系统运行将不稳定,而且可用内存资源往往也会不足。
多进程解决方案在面临每天需要成百上千万次下载任务的爬虫系统,或者需要同时搞定数万并发的电商系统来说,并不适合。
除了切换开销大,以及 可支持的任务规模小 之外,多进程还有其他缺点,如状态共享等问题,后文会有提及,此处不再细究。
# 继续改进:多线程
由于线程的数据结构比进程更轻量级,同一个进程可以容纳多个线程,从进程到线程的优化由此展开。后来的 OS 也把调度单位由进程转为线程,进程只作为线程的容器,用于管理进程所需的资源。而且 OS 级别的线程是可以被分配到不同的 CPU 核心同时运行的。
def thread_way():
"""
线程池方式
"""
workers = 10
with futures.ThreadPoolExecutor(workers) as executor:
futs = {executor.submit(blocking_way) for _ in range(try_count)}
return len([fut.result() for fut in futs])
2
3
4
5
6
7
8
注:总体运行时间约 0.43 秒。
结果符合预期,比多进程耗时要少些。从运行时间上看,多线程似乎已经解决了切换开销大的问题。而且可支持的任务数量规模,也变成了数百个到数千个。
但是,多线程仍有问题,特别是 Python 里的多线程。首先,Python 中的多线程因为 GIL 的存在,它们并不能利用 CPU 多核优势,一个 Python 进程中,只允许有一个线程处于运行状态。那为什么结果还是如预期,耗时缩减到了十分之一?
因为在做阻塞的系统调用时,例如sock.connect()
和sock.recv()
时,当前线程会释放 GIL,让别的线程有机会执行。但是单个线程内,在调用上还是阻塞的。
INFO
Python 中 time.sleep
是阻塞的,都知道使用它要谨慎,但在多线程编程中,time.sleep
并不会阻塞其他线程。
除了 GIL 之外,所有的多线程还有通病。它们是被 OS 调度,调度策略是抢占式的,以保证同等优先级的线程都有均等的执行机会,那带来的问题是:并不知道下一时刻是哪个线程被运行,也不知道它正要执行的代码是什么。所以就可能存在竞态条件。
例如爬虫工作线程从任务队列拿待抓取 URL 的时候,如果多个爬虫线程同时来取,那这个任务到底该给谁?那就需要用到“锁”或“同步队列”来保证下载任务不会被重复执行。
而且线程支持的多任务规模,在数百到数千的数量规模。在大规模的高频网络交互系统中,仍然有些吃力。当然,多线程最主要的问题还是竞态条件。
# 线程池与多线程的区别
- 线程池是在程序运行开始,创建好的 n 个线程,并且这 n 个线程挂起等待任务的到来。而多线程是在任务到来的时候进行创建,然后执行任务。
- 线程池中的线程执行完之后不会回收线程,会继续将线程放在等待队列中;多线程程序在每次任务完成之后会回收该线程。
- 由于线程池中线程是创建好的,所以在效率上相对于多线程会高很多。
- 线程池也在高并发的情况下有着较好的性能;不容易挂掉。多线程在创建线程数较多的情况下,很容易挂掉。
# 非阻塞方式
终于,我们来到了非阻塞解决方案。先来看看最原始的非阻塞如何工作的。
def non_blocking_way():
sock = socket.socket()
# https://docs.python.org/zh-cn/3/library/socket.html#socket.socket.setblocking
sock.setblocking(False)
try:
sock.connect(('example.com', 80))
except BlockingIOError:
# 非阻塞连接过程中也会抛出异常
pass
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
data = request.encode('ascii')
# 不知道socket何时就绪,所以不断尝试发送
while True:
try:
sock.send(data)
# 直到send不抛异常,则发送完成
break
except OSError:
pass
response = b''
while True:
try:
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
break
except OSError:
pass
return response
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
注:总体耗时约 4.3 秒。
首先注意到两点,就感觉被骗了。一是耗时与同步阻塞相当,二是代码更复杂。坑爹呢?要非阻塞何用!且慢。
注意
在我的实验中,非阻塞消耗明显大于阻塞消耗。
blocking_way (10, 'sync_way', 0.1429271697998047)
non_blocking_way (10, 'sync_way', 8.619038343429565)
2
上图第 9 行代码sock.setblocking(False)
告诉 OS,让 socket 上阻塞调用都改为非阻塞的方式。之前我们说到,非阻塞就是在做一件事的时候,不阻碍调用它的程序做别的事情。上述代码在执行完 sock.connect()
和 sock.recv()
后的确不再阻塞,可以继续往下执行请求准备的代码或者是执行下一次读取。
代码变得更复杂也是上述原因所致。第 11 行要放在try
语句内,是因为socket
在发送非阻塞连接请求过程中,系统底层也会抛出异常。connect()
被调用之后,立即可以往下执行第 15 和 16 行的代码。
需要 while 循环不断尝试 send()
,是因为connect()
已经非阻塞,在send()
之时并不知道 socket
的连接是否就绪,只有不断尝试,尝试成功为止,即发送数据成功了。recv()
调用也是同理。
虽然 connect() 和 recv() 不再阻塞主程序,空出来的时间段 CPU 没有空闲着,但并没有利用好这空闲去做其他有意义的事情,而是在循环尝试读写 socket (不停判断非阻塞调用的状态是否就绪)。还得处理来自底层的可忽略的异常。也不能同时处理多个 socket 。
然后 10 次下载任务仍然按序进行。所以总体执行时间和同步阻塞相当。如果非得这样子,那还不如同步阻塞算了。
# 非阻塞改进
# epoll
判断非阻塞调用是否就绪如果 OS 能做,是不是应用程序就可以不用自己去等待和判断了,就可以利用这个空闲去做其他事情以提高效率。
所以OS 将 I/O 状态的变化都封装成了事件,如可读事件、可写事件。并且提供了专门的系统模块让应用程序可以接收事件通知。这个模块就是select
。让应用程序可以通过select
注册文件描述符和回调函数。当文件描述符的状态发生变化时,select
就调用事先注册的回调函数。
select
因其算法效率比较低,后来改进成了poll
,再后来又有进一步改进,BSD 内核改进成了kqueue
模块,而 Linux 内核改进成了epoll
模块。这四个模块的作用都相同,暴露给程序员使用的 API 也几乎一致,区别在于kqueue
和 epoll
在处理大量文件描述符时效率更高。
鉴于 Linux 服务器的普遍性,以及为了追求更高效率,所以我们常常听闻被探讨的模块都是 epoll
。
# 回调(Callback)
回调函数可以理解为是 IO 事件完毕后执行提前注册的回调函数。
把 I/O 事件的等待和监听任务交给了 OS,那 OS 在知道 I/O 状态发生改变后(例如 socket 连接已建立成功可发送数据),它又怎么知道接下来该干嘛呢?只能回调。
需要我们将发送数据与读取数据封装成独立的函数,让epoll
代替应用程序监听socket
状态时,得告诉epoll
:“如果socket
状态变为可以往里写数据(连接建立成功了),请调用 HTTP 请求发送函数。如果socket
变为可以读数据了(客户端已收到响应),请调用响应处理函数。”
于是我们利用epoll
结合回调机制重构爬虫代码:
#!/usr/bin/env python
# encoding: utf-8
import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
selector = DefaultSelector()
stopped = False
urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
class Crawler:
def __init__(self, url):
self.url = url
self.sock = None
self.response = b''
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('example.com', 80))
except BlockingIOError:
pass
selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
def connected(self, key, mask):
selector.unregister(key.fd)
get = 'GET {0} HTTP/1.0\r\nHost: example.com\r\n\r\n'.format(self.url)
self.sock.send(get.encode('ascii'))
selector.register(key.fd, EVENT_READ, self.read_response)
def read_response(self, key, mask):
global stopped
# 如果响应大于4KB,下一次循环会继续读
chunk = self.sock.recv(4096)
if chunk:
self.response += chunk
else:
selector.unregister(key.fd)
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
此处和前面稍有不同的是,我们将下载不同的 10 个页面,相对 URL 路径存放于 urls_todo 集合中。现在看看改进在哪。
首先,不断尝试send()
和 recv()
的两个循环被消灭掉了。
其次,导入了selectors
模块,并创建了一个DefaultSelector
实例。Python 标准库提供的selectors
模块是对底层select/poll/epoll/kqueue
的封装。DefaultSelector
类会根据 OS 环境自动选择最佳的模块,那在 Linux 2.5.44 及更新的版本上都是epoll
了。
然后,在第 25 行和第 31 行分别注册了socket
可写事件(EVENT_WRITE)
和可读事件(EVENT_READ)
发生后应该采取的回调函数。
虽然代码结构清晰了,阻塞操作也交给 OS 去等待和通知了,但是,我们要抓取 10 个不同页面,就得创建 10 个Crawler
实例,就有 20 个事件将要发生,那如何从selector
里获取当前正发生的事件,并且得到对应的回调函数去执行呢?
# 事件循环(Event Loop)
事件循环 “是一种等待程序分配事件或消息的编程架构”。基本上来说事件循环就是,“当 A 发生时,执行 B”。 事件循环提供一种循环机制,让你可以“在 A 发生时,执行 B”。基本上来说事件循环就是监听当有什么发生时,同时事件循环也关心这件事并执行相应的代码。事件循环被认为是一种循环是因为它不停地收集事件并通过循环来查找如何应对这些事件。对 Python 来说,用来提供事件循环的 asyncio 被加入标准库中。asyncio 重点解决网络服务中的问题,事件循环在这里将来自 socket 的 I/O 已经准备好读和/或写作为“当 A 发生时”(通过selectors
模块)。
为了解决上述问题,那我们只得采用老办法,写一个循环,去访问 selector 模块,等待它告诉我们当前是哪个事件发生了,应该对应哪个回调。这个等待事件通知的循环,称之为事件循环。
def loop():
while not stopped:
# 阻塞, 直到一个事件发生
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback(event_key, event_mask)
2
3
4
5
6
7
上述代码中,我们用stopped
全局变量控制事件循环何时停止。当urls_todo
消耗完毕后,会标记stopped
为True
。
重要的是第 49 行代码,selector.select()
是一个阻塞调用,因为如果事件不发生,那应用程序就没事件可处理,所以就干脆阻塞在这里等待事件发生。那可以推断,如果只下载一篇网页,一定要connect()
之后才能send()
继而recv()
,那它的效率和阻塞的方式是一样的。因为不在connect()/recv()
上阻塞,也得在select()
上阻塞。
所以,selector 机制(后文以此称呼代指 epoll/kqueue)是设计用来解决大量并发连接的。当系统中有大量非阻塞调用,能随时产生事件的时候,selector
机制才能发挥最大的威力。
下面是如何启创建 10 个下载任务和启动事件循环的:
if __name__ == '__main__':
import time
start = time.time()
for url in urls_todo:
crawler = Crawler(url)
crawler.fetch()
loop()
print(time.time() - start)
2
3
4
5
6
7
8
9
注:总体耗时约 0.45 秒。
上述执行结果令人振奋。在单线程内用 事件循环+回调 搞定了 10 篇网页同时下载的问题。这,已经是异步编程了。虽然有一个 for 循环顺序地创建 Crawler 实例并调用fetch
方法,但是fetch
内仅有connect()
和注册可写事件,而且从执行时间明显可以推断,多个下载任务确实在同时进行!
上述代码异步执行的过程:
- 创建
Crawler
实例; - 调用
fetch
方法,会创建socket
连接和在selector
上注册可写事件; - fetch 内并无阻塞操作,该方法立即返回;
- 重复上述 3 个步骤,将 10 个不同的下载任务都加入事件循环;
- 启动事件循环,进入第 1 轮循环,阻塞在事件监听上;
- 当某个下载任务
EVENT_WRITE
被触发,回调其connected
方法,第一轮事件循环结束; - 进入第 2 轮事件循环,当某个下载任务有事件触发,执行其回调函数;此时已经不能推测是哪个事件发生,因为有可能是上次
connected
里的EVENT_READ
先被触发,也可能是其他某个任务的EVENT_WRITE
被触发;(此时,原来在一个下载任务上会阻塞的那段时间被利用起来执行另一个下载任务了) - 循环往复,直至所有下载任务被处理完成
- 退出事件循环,结束整个下载程序
# 总结
目前为止,我们已经从同步阻塞学习到了异步非阻塞。掌握了在单线程内同时并发执行多个网络 I/O 阻塞型任务的黑魔法。而且与多线程相比,连线程切换都没有了,执行回调函数是函数调用开销,在线程的栈内完成,因此性能也更好,单机支持的任务规模也变成了数万到数十万个。(不过我们知道:没有免费午餐,也没有银弹。)
部分编程语言中,对异步编程的支持就止步于此(不含语言官方之外的扩展)。需要程序猿直接使用 epoll 去注册事件和回调、维护一个事件循环,然后大多数时间都花在设计回调函数上。
通过本节的学习,我们应该认识到,不论什么编程语言,但凡要做异步编程,上述的“事件循环+回调”这种模式是逃不掉的,尽管它可能用的不是epoll
,也可能不是while
循环。如果你找到了一种不属于 “等会儿告诉你” 模型的异步方式,请立即给我打电话(注意,打电话是 Call)。
为什么我们在某些异步编程中并没有看到 CallBack 模式呢?这就是我们接下来要探讨的问题。本节是学习异步编程的一个终点,也是另一个起点。毕竟咱们讲 Python 异步编程,还没提到其主角协程的用武之地。
# Python 对异步 I/O 的优化之路
我们将在本节学习到 Python 生态对异步编程的支持是如何继承前文所述的“事件循环+回调”模式演变到asyncio
的原生协程模式。
# 回调之痛,以终为始
在第 3 节中,我们已经学会了“事件循环+回调”的基本运行原理,可以基于这种方式在单线程内实现异步编程。也确实能够大大提高程序运行效率。但是,刚才所学的只是最基本的,然而在生产项目中,要应对的复杂度会大大增加。考虑如下问题:
- 如果回调函数执行不正常该如何?
- 如果回调里面还要嵌套回调怎么办?要嵌套很多层怎么办?
- 如果嵌套了多层,其中某个环节出错了会造成什么后果?
- 如果有个数据需要被每个回调都处理怎么办?
- ……
在实际编程中,上述系列问题不可避免。在这些问题的背后隐藏着回调编程模式的一些缺点:
- 回调层次过多时代码可读性差
def callback_1():
# processing ...
def callback_2():
# processing.....
def callback_3():
# processing ....
def callback_4():
#processing .....
def callback_5():
# processing ......
async_function(callback_5)
async_function(callback_4)
async_function(callback_3)
async_function(callback_2)
async_function(callback_1)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- 破坏代码结构
写同步代码时,关联的操作时自上而下运行:
do_a()
do_b()
2
如果 b 处理依赖于 a 处理的结果,而 a 过程是异步调用,就不知 a 何时能返回值,需要将后续的处理过程以 callback 的方式传递给 a ,让 a 执行完以后可以执行 b。代码变化为:
do_a(do_b())
如果整个流程中全部改为异步处理,而流程比较长的话,代码逻辑就会成为这样:
do_a(do_b(do_c(do_d(do_e(do_f(......))))))
上面实际也是回调地狱式的风格,但这不是主要矛盾。主要在于,原本从上而下的代码结构,要改成从外到内的。先 a,再 b,再 c,……,直到最内层 f 执行完成。在同步版本中,执行完 a 后执行 b,这是线程的指令指针控制着的流程,而在回调版本中,流程就是程序猿需要注意和安排的。
共享状态管理困难
回顾第 3 节爬虫代码,同步阻塞版的sock
对象从头使用到尾,而在回调的版本中,我们必须在Crawler
实例化后的对象self
里保存它自己的sock
对象。如果不是采用 OOP 的编程风格,那需要把要共享的状态接力似的传递给每一个回调。多个异步调用之间,到底要共享哪些状态,事先就得考虑清楚,精心设计。错误处理困难
一连串的回调构成一个完整的调用链。例如上述的 a 到 f。假如 d 抛了异常怎么办?整个调用链断掉,接力传递的状态也会丢失,这种现象称为调用栈撕裂。 c 不知道该干嘛,继续异常,然后是 b 异常,接着 a 异常。好嘛,报错日志就告诉你,a 调用出错了,但实际是 d 出错。所以,为了防止栈撕裂,异常必须以数据的形式返回,而不是直接抛出异常,然后每个回调中需要检查上次调用的返回值,以防错误吞没。
如果说代码风格难看是小事,但栈撕裂和状态共享困难这两个缺点会让基于回调的异步编程很艰难。所以不同编程语言的生态都在致力于解决这个问题。才诞生了后来的Promise
、Co-routine
等解决方案。
Python 生态也以终为始,秉承着“程序猿不必难程序猿”的原则,让语言和框架开发者苦逼一点,也要让应用开发者舒坦。在事件循环+回调的基础上衍生出了基于协程的解决方案,代表作有 Tornado、Twisted、asyncio 等。接下来我们随着 Python 生态异步编程的发展过程,深入理解 Python 异步编程。
# 4.2 核心问题
通过前面的学习,我们清楚地认识到异步编程最大的困难:异步任务何时执行完毕?接下来要对异步调用的返回结果做什么操作?
上述问题我们已经通过事件循环和回调解决了。但是回调会让程序变得复杂。要异步,必回调,又是否有办法规避其缺点呢?那需要弄清楚其本质,为什么回调是必须的?还有使用回调时克服的那些缺点又是为了什么?
答案是程序为了知道自己已经干了什么?正在干什么?将来要干什么?换言之,程序得知道当前所处的状态,而且要将这个状态在不同的回调之间延续下去。
多个回调之间的状态管理困难,那让每个回调都能管理自己的状态怎么样?链式调用会有栈撕裂的困难,让回调之间不再链式调用怎样?不链式调用的话,那又如何让被调用者知道已经完成了?那就让这个回调通知那个回调如何?而且一个回调,不就是一个待处理任务吗?
任务之间得相互通知,每个任务得有自己的状态。那不就是很古老的编程技法:协作式多任务?然而要在单线程内做调度,啊哈,协程! 每个协程具有自己的栈帧,当然能知道自己处于什么状态,协程之间可以协作那自然可以通知别的协程。
# 4.3 协程
- 协程(Co-routine),即是协作式的例程。
它是非抢占式的多任务子例程的概括,可以允许有多个入口点在例程中确定的位置来控制程序的暂停与恢复执行。
例程是什么?编程语言定义的可被调用的代码段,为了完成某个特定功能而封装在一起的一系列指令。一般的编程语言都用称为函数或方法的代码结构来体现。
# 4.4 基于生成器的协程
早期的 Pythoner 发现 Python 中有种特殊的对象——生成器(Generator),它的特点和协程很像。每一次迭代之间,会暂停执行,继续下一次迭代的时候还不会丢失先前的状态。
为了支持用生成器做简单的协程,Python 2.5 对生成器进行了增强(PEP 342),该增强提案的标题是 “Coroutines via Enhanced Generators”。有了 PEP 342 的加持,生成器可以通过yield
暂停执行和向外返回数据,也可以通过send()
向生成器内发送数据,还可以通过throw()
向生成器内抛出异常以便随时终止生成器的运行。
接下来,我们用基于生成器的协程来重构先前的爬虫代码。
# 4.4.1 未来对象(Future)
不用回调的方式了,怎么知道异步调用的结果呢?先设计一个对象,异步调用执行完的时候,就把结果放在它里面。这种对象称之为未来对象。
selector = DefaultSelector()
stopped = False
urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
未来对象有一个result
属性,用于存放未来的执行结果。还有个set_result()
方法,是用于设置result
的,并且会在给result
绑定值以后运行事先给future
添加的回调。回调是通过未来对象的add_done_callback()
方法添加的。
不要疑惑此处的callback
,说好了不回调的嘛?难道忘了我们曾经说的要异步,必回调。不过也别急,此处的回调,和先前学到的回调,还真有点不一样。
# 4.4.2 重构 Crawler
现在不论如何,我们有了未来对象可以代表未来的值。先用Future
来重构爬虫代码。
和先前的回调版本对比,已经有了较大差异。fetch
方法内有了yield
表达式,使它成为了生成器。我们知道生成器需要先调用next()
迭代一次或者是先send(None)
启动,遇到yield
之后便暂停。那这fetch
生成器如何再次恢复执行呢?至少 Future
和 Crawler
都没看到相关代码。
# 4.4.3 任务对象(Task)
为了解决上述问题,我们只需遵循一个编程规则:单一职责,每种角色各司其职,如果还有工作没有角色来做,那就创建一个角色去做。没人来恢复这个生成器的执行么?没人来管理生成器的状态么?创建一个,就叫Task
好了,很合适的名字。
上述代码中 Task 封装了coro
对象,即初始化时传递给他的对象,被管理的任务是待执行的协程,故而这里的coro
就是fetch()
生成器。它还有个step()
方法,在初始化的时候就会执行一遍。step()
内会调用生成器的send()
方法,初始化第一次发送的是None
就驱动了coro
即fetch()
的第一次执行。
send()
完成之后,得到下一次的future
,然后给下一次的future
添加step()
回调。原来add_done_callback()
不是给写爬虫业务逻辑用的。此前的callback
可就干的是业务逻辑呀。
再看fetch()
生成器,其内部写完了所有的业务逻辑,包括如何发送请求,如何读取响应。而且注册给selector
的回调相当简单,就是给对应的future
对象绑定结果值。两个yield
表达式都是返回对应的future
对象,然后返回Task.step()
之内,这样Task
, Future
, Coroutine
三者精妙地串联在了一起。
初始化Task
对象以后,把fetch()
给驱动到了第 44 行yied f
就完事了,接下来怎么继续?
# 4.4.4 事件循环(Event Loop)驱动协程运行
该事件循环上场了。接下来,只需等待已经注册的EVENT_WRITE
事件发生。事件循环就像心脏一般,只要它开始跳动,整个程序就会持续运行。
注:总体耗时约 0.43 秒。
现在loop
有了些许变化,callback()
不再传递event_key
和event_mask
参数。也就是说,这里的回调根本不关心是谁触发了这个事件,结合fetch()
可以知道,它只需完成对future
设置结果值即可f.set_result()
。而且future
是谁它也不关心,因为协程能够保存自己的状态,知道自己的future
是哪个。也不用关心到底要设置什么值,因为要设置什么值也是协程内安排的。
此时的loop()
,真的成了一个心脏,它只管往外泵血,不论这份血液是要输送给大脑还是要给脚趾,只要它还在跳动,生命就能延续。
# 4.4.5 生成器协程风格和回调风格对比总结
在回调风格中:
- 存在链式回调(虽然示例中嵌套回调只有一层)
- 请求和响应也不得不分为两个回调以至于破坏了同步代码那种结构
- 程序员必须在回调之间维护必须的状态。
还有更多示例中没有展示,但确实存在的问题,参见 4.1 节。
而基于生成器协程的风格:
- 无链式调用
selector
的回调里只管给future
设置值,不再关心业务逻辑loop
内回调callback()
不再关注是谁触发了事件- 已趋近于同步代码的结构
- 无需程序员在多个协程之间维护状态,例如哪个才是自己的
sock
# 4.4.6 碉堡了,但是代码很丑!能不能重构
如果说fetch
的容错能力要更强,业务功能也需要更完善,怎么办?而且技术处理的部分(socket 相关的)和业务处理的部分(请求与返回数据的处理)混在一起。
- 创建
socket
连接可以抽象复用吧? - 循环读取整个
response
可以抽象复用吧? - 循环内处理
socket.recv()
的可以抽象复用吧?
但是这些关键节点的地方都有yield
,抽离出来的代码也需要是生成器。而且fetch()
自己也得是生成器。生成器里玩生成器,代码好像要写得更丑才可以……
Python 语言的设计者们也认识到了这个问题,再次秉承着“程序猿不必为难程序猿”的原则,他们捣鼓出了一个yield from
来解决生成器里玩生成器的问题。
# 4.5 用 yield from 改进生成器协程
# 4.5.1 yield from
语法介绍
yield from
是 Python 3.3 新引入的语法(PEP 380)。它主要解决的就是在生成器里玩生成器不方便的问题。它有两大主要功能。
第一个功能是:让嵌套生成器不必通过循环迭代yield
,而是直接yield from
。以下两种在生成器里玩子生成器的方式是等价的。
def gen_one():
subgen = range(10)
yield from subgen
def gen_two():
subgen = range(10)
for item in subgen:
yield item
2
3
4
5
6
7
8
第二个功能就是在子生成器和原生成器的调用者之间打开双向通道,两者可以直接通信。
def gen():
yield from subgen()
def subgen():
while True:
x = yield
yield x+1
def main():
g = gen()
next(g) # 驱动生成器g开始执行到第一个 yield
retval = g.send(1) # 看似向生成器 gen() 发送数据
print(retval) # 返回2
g.throw(StopIteration) # 看似向gen()抛入异常
2
3
4
5
6
7
8
9
10
11
12
13
14
通过上述代码清晰地理解了yield from
的双向通道功能。关键字yield from
在gen()
内部为subgen()
和main()
开辟了通信通道。main()
里可以直接将数据1
发送给subgen()
,subgen()
也可以将计算后的数据2
返回到main()
里,main()
里也可以直接向subgen()
抛入异常以终止subgen()
。
顺带一提,yield from
除了可以 yield from <generator>
还可以 yield from <iterable>
。
# 4.5.2 重构代码
抽象 socket 连接的功能:
def connect(sock, address):
f = Future()
sock.setblocking(False)
try:
sock.connect(address)
except BlockingIOError:
pass
def on_connected():
f.set_result(None)
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield from f
selector.unregister(sock.fileno())
2
3
4
5
6
7
8
9
10
11
12
13
14
抽象单次recv()
和读取完整的 response 功能:
def read(sock):
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield from f
selector.unregister(sock.fileno())
return chunk
def read_all(sock):
response = []
chunk = yield from read(sock)
while chunk:
response.append(chunk)
chunk = yield from read(sock)
return b''.join(response)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
三个关键点的抽象已经完成,现在重构Crawler
类:
class Crawler:
def __init__(self, url):
self.url = url
self.response = b''
def fetch(self):
global stopped
sock = socket.socket()
yield from connect(sock, ('example.com', 80))
get = 'GET {0} HTTP/1.0\r\nHost: example.com\r\n\r\n'.format(self.url)
sock.send(get.encode('ascii'))
self.response = yield from read_all(sock)
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
2
3
4
5
6
7
8
9
10
11
12
13
14
15
上面代码整体来讲没什么问题,可复用的代码已经抽象出去,作为子生成器也可以使用 yield from
语法来获取值。但另外有个点需要注意:在第 24 和第 35 行返回 future 对象的时候,我们了yield from f
而不是原来的yield f
。yield
可以直接作用于普通 Python 对象,而yield from
却不行,所以我们对Future
还要进一步改造,把它变成一个iterable
对象就可以了。
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
def __iter__(self):
yield self
return self.result
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
只是增加了__iter__()
方法的实现。如果不把Future
改成iterable
也是可以的,还是用原来的yield f
即可。那为什么需要改进呢?
首先,我们是在基于生成器做协程,而生成器还得是生成器,如果继续混用yield
和yield from
做协程,代码可读性和可理解性都不好。其次,如果不改,协程内还得关心它等待的对象是否可被yield
,如果协程里还想继续返回协程怎么办?如果想调用普通函数动态生成一个Future
对象再返回怎么办?
所以,在 Python 3.3 引入yield from
新语法之后,就不再推荐用yield
去做协程。全都使用yield from
由于其双向通道的功能,可以让我们在协程间随心所欲地传递数据。
# 4.5.3 yield from
改进协程总结
用yield from
改进基于生成器的协程,代码抽象程度更高。使业务逻辑相关的代码更精简。由于其双向通道功能可以让协程之间随心所欲传递数据,使 Python 异步编程的协程解决方案大大向前迈进了一步。
于是 Python 语言开发者们充分利用yield from
,使 Guido 主导的 Python 异步编程框架Tulip
迅速脱胎换骨,并迫不及待得让它在 Python 3.4 中换了个名字asyncio
以“实习生”角色出现在标准库中。
# 4.5.4 asyncio 介绍
asyncio
是 Python 3.4 试验性引入的异步 I/O 框架(PEP 3156),提供了基于协程做异步 I/O 编写单线程并发代码的基础设施。其核心组件有事件循环(Event Loop)、协程(Coroutine)、任务(Task)、未来对象(Future)以及其他一些扩充和辅助性质的模块。
在引入asyncio
的时候,还提供了一个装饰器@asyncio.coroutine
用于装饰使用了yield from
的函数,以标记其为协程。但并不强制使用这个装饰器。
虽然发展到 Python 3.4 时有了yield from
的加持让协程更容易了,但是由于协程在 Python 中发展的历史包袱所致,很多人仍然弄不明白生成器和协程的联系与区别,也弄不明白yield
和 yield from
的区别。这种混乱的状态也违背 Python 之禅的一些准则。
于是 Python 设计者们又快马加鞭地在 3.5 中新增了async/await
语法(PEP 492),对协程有了明确而显式的支持,称之为原生协程。async/await
和 yield from
这两种风格的协程底层复用共同的实现,而且相互兼容。
在 Python 3.6 中asyncio
库“转正”,不再是实验性质的,成为标准库的正式一员。
# 4.6 总结
行至此处,我们已经掌握了asyncio
的核心原理,学习了它的原型,也学习了异步 I/O 在 CPython 官方支持的生态下是如何一步步发展至今的。
实际上,真正的asyncio
比我们前几节中学到的要复杂得多,它还实现了零拷贝、公平调度、异常处理、任务状态管理等等使 Python 异步编程更完善的内容。理解原理和原型对我们后续学习有莫大的帮助。
# asyncio 和原生协程初体验
本节中,我们将初步体验asyncio
库和新增语法async/await
给我们带来的便利。由于 Python2-3 的过度期间,Python3.0-3.4 的使用者并不是太多,也为了不让更多的人困惑,也因为aysncio
在 3.6 才转正,所以更深入学习asyncio
库的时候我们将使用async/await
定义的原生协程风格,yield from
风格的协程不再阐述(实际上它们可用很小的代价相互代替)。
对比生成器版的协程,使用 asyncio 库后变化很大:
- 没有了
yield
或yield from
,而是async/await
- 没有了自造的
loop()
,取而代之的是asyncio.get_event_loop()
- 无需自己在 socket 上做异步操作,不用显式地注册和注销事件,
aiohttp
库已经代劳 - 没有了显式的
Future
和Task
,asyncio
已封装 - 更少量的代码,更优雅的设计
说明:我们这里发送和接收 HTTP 请求不再自己操作socket
的原因是,在实际做业务项目的过程中,要处理妥善地 HTTP 协议会很复杂,我们需要的是功能完善的异步 HTTP 客户端,业界已经有了成熟的解决方案,DRY 不是吗?
和同步阻塞版的代码对比:
- 异步化
- 代码量相当(引入 aiohttp 框架后更少)
- 代码逻辑同样简单,跟同步代码一样的结构、一样的逻辑
- 接近 10 倍的性能提升
# 结语
到此为止,我们已经深入地学习了异步编程是什么、为什么、在 Python 里是怎么样发展的。我们找到了一种让代码看起来跟同步代码一样简单,而效率却提升 N 倍(具体提升情况取决于项目规模、网络环境、实现细节)的异步编程方法。它也没有回调的那些缺点。
本系列教程接下来的一篇将是学习asyncio
库如何的使用,快速掌握它的主要内容。后续我们还会深入探究asyncio
的优点与缺点,也会探讨 Python 生态中其他异步 I/O 方案和asyncio
的区别。