并行分布式框架 Celery
# 概念
# 异步任务
异步任务是 web 开发中一个很常见的方法。对于一些耗时耗资源的操作,往往从主应用中隔离,通过异步的方式执行。简而言之,做一个注册的功能,在用户使用邮箱注册成功之后,需要给该邮箱发送一封激活邮件。如果直接放在应用中,则调用发邮件的过程会遇到网络 IO 的阻塞,比好优雅的方式则是使用异步任务,应用在业务逻辑中触发一个异步任务。
# 生产者-消费者模式
在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
单单抽象出生产者和消费者,还够不上是生产者消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过消息队列(缓冲区)来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给消息队列,消费者不找生产者要数据,而是直接从消息队列里取,消息队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个消息队列就是用来给生产者和消费者解耦的。
解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
# 组件
Task
包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
- Producer
调用了 Celery 提供的 API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
- Celery Beat
任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
Broker
消息代理,又称消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。Celery 目前支持 RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper 等作为消息代理,但适用于生产环境的只有 RabbitMQ 和 Redis, 官方推荐 RabbitMQ。
- Celery Worker
执行任务的消费者,它实时监控消息队列,获取队列中调度的任务并执行。生产中通常会在多台服务器运行多个消费者来提高执行效率。
- Result Backend
任务处理完后保存状态信息和结果,以供查询。Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。
# 架构
Celery 的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
- 消息中间件
Celery 本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ,Redis,MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental),IronMQ
- 任务执行单元
Worker 是 Celery 提供的任务执行的单元,worker 并发的运行在分布式的系统节点中。
- 任务结果存储
Task result store 用来存储 Worker 执行的任务的结果,Celery 支持以不同方式存储任务的结果,包括 AMQP,redis,memcached,mongodb,SQLAlchemy,Django ORM,Apache Cassandra,IronCache 等。
# 最佳实践
ref:Deni Bertovic :: Celery - Best Practices (opens new window),不习惯看英文的可以看这里 (opens new window)
不要使用数据库作为 AMQP 代理 我们经常为了方便(不需要新的中间件和配置即可使用)而使用数据库作为消息队列,但是在生产环境中可能因为轮询的 io 操作导致数据库崩溃。
使用更多的队列(而不仅仅是默认 default 队列) 默认不做配置的时候,所有的工作都会交给 default 队列去做,可能导致不太重要的任务 b 在做而重要的任务 a 一直在等待
使用优先 workers
- 手动定义队列
CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'), Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'), )
1
2
3
4
5- route 决定每个任务进入的队列
CELERY_ROUTES = { 'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'}, 'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'}, }
1
2
3
4运行之后我们可以看到如下信息:
celery worker -E -l INFO -n workerA -Q for_task_A celery worker -E -l INFO -n workerB -Q for_task_B
1
2使用 celery 的错误处理
retry
机制@app.task(bind=True, default_retry_delay=300, max_retries=5) def my_task_a(): try: print("doing stuff here...") except SomeNetworkException as e: print("maybe do some clean up here....") self.retry(e)
1
2
3
4
5
6
7由于第三方接口或者网络问题导致的错误,应该使用重试机制。
-
Flower 项目是一个很好的工具,可以监控你的 tasks 和 workers。
只有在你真正需要的时候才记录结果
需要注意的是这里的结果是指 task 的退出状态不是任务执行的作业的结果,通过
CELERY_IGNORE_RESULT = True
配置禁用记录保存不要将 Database/ORM 对象传递给任务
序列化的对象可能包含陈旧数据。您需要做的是向任务提供 User id,并让任务向数据库请求一个新的 User 对象。
# 面试题
- 简述 celery 是什么以及应用场景?
- 简述 celery 运行机制?
- celery 如何实现定时任务?
# 启用Celery的定时任务需要设置CELERYBEAT_SCHEDULE 。
CELERYBEAT_SCHEDULE='djcelery.schedulers.DatabaseScheduler'#定时任务
'创建定时任务'
# 通过配置CELERYBEAT_SCHEDULE:每30秒调用task.add
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': timedelta(seconds=30),
'args': (16, 16)
},
}
2
3
4
5
6
7
8
9
10
11
12
13
python 测试开发 django-160.Celery 定时任务 (beat) - 上海-悠悠 - 博客园 (opens new window) 4. 简述 celery 多任务结构目录
pro_cel
├── celery_tasks # celery相关文件夹
│ ├── celery.py # celery连接和配置相关文件
│ └── tasks.py # 所有任务函数
├── check_result.py # 检查结果
└── send_task.py # 触发任务
2
3
4
5
6
- celery 中装饰器 @app.task 和 @shared_task 的区别?
在 celery V4 中,我们如下使用:在编写可重用库或 django 应用程序的实例中如下使用。例如,如果您正在编写一组开源任务,允许您使用 celery 管理 aws ec2 实例,那么您可以使用
from myproject.tasks import app @app.task def foo(): pass
1
2
3
4
5shared_task
,这样任务就可以在 celery 上运行,可以让使用库的人自行配置 celery。ref:python - Difference between different ways to create celery task - Stack Overflow (opens new window)from celery import shared_task @shared_task def foo(): pass
1
2
3
4
5
# 示例
注意
此处直接粘贴复制的,没有验证过,可能与目前最新版存在配置差异,需要读者自行尝试。
# Getting Starting
使用 celery 包含三个方面,其一是定义任务函数,其二是运行 celery 服务,最后是客户应用程序的调用。
创建一个文件 tasks.py
输入下列代码:
from celery import Celery
brokers = 'redis://127.0.0.1:6379/5'
backend = 'redis://127.0.0.1:6379/6'
app = Celery('tasks', broker=broker, backend=backend)
@app.task
def add(x, y):
return x + y
2
3
4
5
6
7
8
9
10
11
上述代码导入了 celery,然后创建了 celery 实例 app,实力话的过程中,指定了任务名tasks
(和文件名一致),传入了 broker 和 backend。然后创建了一个任务函数add
。
下面就启动 celery 服务
在当前命令行终端运行:
celery -A tasks worker --loglevel=info
2
此时会看见输出,包括注册的任务。
客户端程序如何调用呢?打开一个命令行,进入 Python 环境
In [0]:from tasks import add
In [1]: r = add.delay(2, 2)
In [2]: add.delay(2, 2)
Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d>
In [3]: r = add.delay(3, 3)
In [4]: r.re
r.ready r.result r.revoke
In [4]: r.ready()
Out[4]: True
In [6]: r.result
Out[6]: 6
In [7]: r.get()
Out[7]: 6
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
在 celery 命令行可以看见 celery 执行的日志:
[2015-09-20 21:37:06,086: INFO/MainProcess] Task proj.tasks.add[76beb980-0f55-4629-a4fb-4a1776428ea8] succeeded in 0.00089102005586s: 6
2
打开 backend 的 redis,也可以看见 celery 执行的信息。
现在是在 python 环境中调用的 add 函数,实际上通常在应用程序中调用这个方法。需要注意,如果把返回值赋值给一个变量,那么原来的应用程序也会被阻塞,需要等待异步任务返回的结果。因此,实际使用中,不需要把结果赋值。
# 计划任务
上述的使用是简单的配置,下面介绍一个更健壮的方式来使用 celery。首先创建一个 python 包,celery 服务,姑且命名为 proj。目录文件如下:
☁ proj tree
.
├── __init__.py
├── celery.py # 创建 celery 实例
├── config.py # 配置文件
└── tasks.py # 任务函数
2
3
4
5
6
首先是 celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from celery import Celery
app = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.config')
if __name__ == '__main__':
app.start()
2
3
4
5
6
7
8
9
10
11
12
这一次创建 app,并没有直接指定 broker 和 backend。而是在配置文件中。
# config.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
BROKER_URL = 'redis://127.0.0.1:6379/6'
2
3
4
5
6
7
8
9
10
剩下的就是 tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from proj.celery import app
@app.task
def add(x, y):
return x + y
2
3
4
5
6
7
8
9
使用方法也很简单,在 proj 的同一级目录执行 celery:
celery -A proj worker -l info
现在使用任务也很简单,直接在客户端代码调用 proj.tasks 里的函数即可。
# Scheduler
一种常见的需求是每隔一段时间执行一个任务。配置如下
# config.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
BROKER_URL = 'redis://127.0.0.1:6379/6'
CELERY_TIMEZONE = 'Asia/Shanghai'
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'proj.tasks.add',
'schedule': timedelta(seconds=30),
'args': (16, 16)
},
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
注意配置文件需要指定时区。这段代码表示每隔 30 秒执行 add 函数。
一旦使用了 scheduler, 启动 celery 需要加上-B 参数
celery -A proj worker -B -l info
# crontab
计划任务当然也可以用 crontab 实现,celery 也有 crontab 模式。修改 config.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
BROKER_URL = 'redis://127.0.0.1:6379/6'
CELERY_TIMEZONE = 'Asia/Shanghai'
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
# Executes every Monday morning at 7:30 A.M
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
总而言之,scheduler 的切分度更细,可以精确到秒。crontab 模式就不用说了。当然 celery 还有更高级的用法,比如多个机器使用,启用多个 worker 并发处理等。