Celery是Python开发的分布式任务调度模块,本身不含消息服务,它使用第三方消息服务来传递任务,目前,Celery支持的消息服务有RabbitMQ、Redis甚至是数据库。
celery的5个主要部分:
-
Task
任务,有异步任务和定时任务。
-
Beat
定时任务调度器,根据配置定时将任务发送给Broker。
-
Broker
中间人,接收生产者发来的消息即 Task,将任务存入队列。任务的消费者是 Worker。
Celery 本身不提供队列服务,推荐用 Redis 或 RabbitMQ 实现队列服务。
-
Worker
执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
-
Backend
用于存储任务的执行结果,可以使用redis作为存储。
如果使用redis提供存储服务,需要 pip install redis
安装 redis 库。
1 2 3 4 5 6 7 8 9 10
| from celery import Celery
broker = 'redis://127.0.0.1:6379/1' app = Celery('demo', broker=broker)
@app.task def my_task(task_name): print("任务%s正在执行" %task_name)
|
此时在 tasks.py 所在的目录下运行 celery -A tasks worker --loglevel=info
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| (celery_venv) (base) ➜ celery celery -A tasks worker --loglevel=info
-------------- celery@tangmeijiandeMacBook-Pro.local v5.0.5 (singularity) --- ***** ----- -- ******* ---- macOS-10.15.7-x86_64-i386-64bit 2021-04-12 12:04:00 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: demo:0x10431dc70 - ** ---------- .> transport: redis://127.0.0.1:6379/1 - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 16 (prefork) # workor并发数量. 可以通过 -c 参数指定开启的线程数 -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
[tasks] # 创建的任务列表 . tasks.my_task [2021-04-12 12:04:01,055: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1 [2021-04-12 12:04:01,062: INFO/MainProcess] mingle: searching for neighbors [2021-04-12 12:04:02,079: INFO/MainProcess] mingle: all alone [2021-04-12 12:04:02,091: INFO/MainProcess] celery@tangmeijiandeMacBook-Pro.local ready.
|
只是启动了 celery,此时就会在 redis 中就会创建以下内容:
1 2 3 4
| 127.0.0.1:6379[1]> keys * 1) "_kombu.binding.celeryev" 2) "_kombu.binding.celery" 3) "_kombu.binding.celery.pidbox"
|
调用任务
1 2 3 4 5
| from tasks import my_task my_task.delay()
>>> my_task.delay() <AsyncResult: a65130bb-8223-477e-8f0d-918e9fbb9a9b>
|
此时 celery客户端的输出:
1 2 3
| [2021-04-12 12:09:07,650: INFO/MainProcess] Received task: tasks.my_task[a65130bb-8223-477e-8f0d-918e9fbb9a9b] [2021-04-12 12:09:07,652: WARNING/ForkPoolWorker-16] 任务函数正在执行。。。。 # my_task.delay()添加的任务被Worker发现后,自动取出并运行。 [2021-04-12 12:09:07,653: INFO/ForkPoolWorker-16] Task tasks.my_task[a65130bb-8223-477e-8f0d-918e9fbb9a9b] succeeded in 0.0014049079999836067s: None # 这个None就是任务的返回值,因为这个任务没有返回值,所以为None。
|
存储任务的结果:
1 2 3 4 5 6 7 8 9 10 11 12 13
| from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('demo', broker=broker, backend=backend)
@app.task def my_task(a, b): print('任务函数正在执行。。。。') return a + b
|
此时运行celery之后:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| (celery_venv) (base) ➜ celery celery -A tasks worker --loglevel=info
-------------- celery@tangmeijiandeMacBook-Pro.local v5.0.5 (singularity) --- ***** ----- -- ******* ---- macOS-10.15.7-x86_64-i386-64bit 2021-04-12 12:20:50 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: demo:0x102fd2c70 - ** ---------- .> transport: redis://127.0.0.1:6379/1 - ** ---------- .> results: redis://127.0.0.1:6379/2 # 区别,设置了结果的存储位置 - *** --- * --- .> concurrency: 16 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
[tasks] . tasks.my_task
[2021-04-12 12:20:50,687: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1 [2021-04-12 12:20:50,695: INFO/MainProcess] mingle: searching for neighbors [2021-04-12 12:20:51,712: INFO/MainProcess] mingle: all alone [2021-04-12 12:20:51,723: INFO/MainProcess] celery@tangmeijiandeMacBook-Pro.local ready.
|
除了增加了result的存储位置,其他没有变化。
调用任务:
1 2 3 4 5
| from tasks import my_task res = my_task.delay() >>> res.result >>> res.failed()
|
使用 dir(res)
可以查看 AsyncResult 对象支持的方法。
在backend设置的存储设置中可以看到任务的返回值:
1 2 3 4
| 127.0.0.1:6379[2]> keys * celery-task-meta-f63fe60e-6505-413d-b052-44116b95d238 127.0.0.1:6379[2]> get celery-task-meta-f63fe60e-6505-413d-b052-44116b95d238 {"status": "SUCCESS", "result": 30, "traceback": null, "children": [], "date_done": "2021-04-12T04:28:07.373922", "task_id": "f63fe60e-6505-413d-b052-44116b95d238"}
|
使用小结(基础的使用流程)
-
创建Celery实例:
创建一个文件,一般使用tasks.py 作为文件名,在内部创建Celery实例。
Celery实例的配置最基础的三个参数是: app_name, broker, backend (其中backend 如果不需要任务返回值,可以不设置)
1 2 3 4 5 6
| from celery import Celery
broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2'
app = Celery('demo', broker=broker, backend=backend)
|
-
创建任务对象:
使用Celery的实例对象 app, 通过 @app.task 装饰器,创建任务。 注意只是创建了任务,并没有添加到broker中。
1 2 3 4
| @app.task def my_task(a, b): print('任务函数正在执行。。。。') return a + b
|
-
运行Celery服务:
使用 celery -A 使用的application的名称 worker --loglevel=info
celer 命令的组成部分:celery OPTIONS COMMAND
OPTIONS 常用 -A 使用的application的名称,如这里的 tasks.py 文件就使用 -A tasks
即可。
COMMAND 常用 worker ,表示开始运行celery实例。
COMMAND 也提供了一些options,如 --loglevel=info
用来设置日志等级。
注意:必须通过 -A 指定使用的应用文件,否则不会自动识别文件,会使用默认的celery,使用的是broker是rabbitmq。
celery -A tasks worker --loglevel=info
-
发送任务到broker中:
获取创建的任务对象,调用任务对象的delay()方法,如果任务有参数,直接传递到delay中即可。
将任务发送到broker队列中,Wroker会自动发现队列中的任务,然后自动执行,执行的结果存储到创建Celery实例时backend中指定的存储设备中。
调用任务对象的 delay() 方法的时候,会返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。
1 2 3 4
| from tasks import my_task
res = my_task.delay(10, 20) res.result
|
想要使用 AsyncResult 对象检查任务的状态或者获得任务的返回值则必须在创建 Celery 实例的时候指定backend。
任务是在 Celery 服务启动的时候一起设置进去的,当调用 任务.delay()
的时候被发送到broker队列中执行。
任务是可以被重复执行的。只需要调用 任务.delay()
即可。
任务可以执行失败,但是不会导致 Celery 服务终止,res.result 会返回错误信息(函数调用的错误信息)。
未完待续。。。