0%

Celery 基本使用

Celery是Python开发的分布式任务调度模块,本身不含消息服务,它使用第三方消息服务来传递任务,目前,Celery支持的消息服务有RabbitMQ、Redis甚至是数据库。

celery的5个主要部分:

  • Task
    任务,有异步任务和定时任务。

  • Beat
    定时任务调度器,根据配置定时将任务发送给Broker。

  • Broker
    中间人,接收生产者发来的消息即 Task,将任务存入队列。任务的消费者是 Worker。
    Celery 本身不提供队列服务,推荐用 Redis 或 RabbitMQ 实现队列服务。

  • Worker
    执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。

  • Backend
    用于存储任务的执行结果,可以使用redis作为存储。

celery基本架构

如果使用redis提供存储服务,需要 pip install redis 安装 redis 库。

1
2
3
4
5
6
7
8
9
10
# tasks.py
from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
app = Celery('demo', broker=broker)

# celery 任务函数,只是创建了任务
@app.task
def my_task(task_name):
print("任务%s正在执行" %task_name)

此时在 tasks.py 所在的目录下运行 celery -A tasks worker --loglevel=info:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
(celery_venv) (base) ➜  celery celery -A tasks worker --loglevel=info

-------------- celery@tangmeijiandeMacBook-Pro.local v5.0.5 (singularity)
--- ***** -----
-- ******* ---- macOS-10.15.7-x86_64-i386-64bit 2021-04-12 12:04:00
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: demo:0x10431dc70
- ** ---------- .> transport: redis://127.0.0.1:6379/1
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 16 (prefork) # workor并发数量. 可以通过 -c 参数指定开启的线程数
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks] # 创建的任务列表
. tasks.my_task

[2021-04-12 12:04:01,055: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
[2021-04-12 12:04:01,062: INFO/MainProcess] mingle: searching for neighbors
[2021-04-12 12:04:02,079: INFO/MainProcess] mingle: all alone
[2021-04-12 12:04:02,091: INFO/MainProcess] celery@tangmeijiandeMacBook-Pro.local ready.

只是启动了 celery,此时就会在 redis 中就会创建以下内容:

1
2
3
4
127.0.0.1:6379[1]> keys *
1) "_kombu.binding.celeryev"
2) "_kombu.binding.celery"
3) "_kombu.binding.celery.pidbox"

调用任务

1
2
3
4
5
from tasks import my_task
my_task.delay() # 会将任务my_task添加到broker队列中。

>>> my_task.delay() # 任务调用会有一个返回值,将会返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。
<AsyncResult: a65130bb-8223-477e-8f0d-918e9fbb9a9b>

此时 celery客户端的输出:

1
2
3
[2021-04-12 12:09:07,650: INFO/MainProcess] Received task: tasks.my_task[a65130bb-8223-477e-8f0d-918e9fbb9a9b] 
[2021-04-12 12:09:07,652: WARNING/ForkPoolWorker-16] 任务函数正在执行。。。。 # my_task.delay()添加的任务被Worker发现后,自动取出并运行。
[2021-04-12 12:09:07,653: INFO/ForkPoolWorker-16] Task tasks.my_task[a65130bb-8223-477e-8f0d-918e9fbb9a9b] succeeded in 0.0014049079999836067s: None # 这个None就是任务的返回值,因为这个任务没有返回值,所以为None。

存储任务的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
from celery import Celery

# broker使用的存储方式
broker = 'redis://127.0.0.1:6379/1'
# 存储结果使用的方式
backend = 'redis://127.0.0.1:6379/2'

app = Celery('demo', broker=broker, backend=backend)

@app.task
def my_task(a, b):
print('任务函数正在执行。。。。')
return a + b # 新建的任务有返回值

此时运行celery之后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
(celery_venv) (base) ➜  celery celery -A tasks worker --loglevel=info

-------------- celery@tangmeijiandeMacBook-Pro.local v5.0.5 (singularity)
--- ***** -----
-- ******* ---- macOS-10.15.7-x86_64-i386-64bit 2021-04-12 12:20:50
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: demo:0x102fd2c70
- ** ---------- .> transport: redis://127.0.0.1:6379/1
- ** ---------- .> results: redis://127.0.0.1:6379/2 # 区别,设置了结果的存储位置
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. tasks.my_task

[2021-04-12 12:20:50,687: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
[2021-04-12 12:20:50,695: INFO/MainProcess] mingle: searching for neighbors
[2021-04-12 12:20:51,712: INFO/MainProcess] mingle: all alone
[2021-04-12 12:20:51,723: INFO/MainProcess] celery@tangmeijiandeMacBook-Pro.local ready.

除了增加了result的存储位置,其他没有变化。

调用任务:

1
2
3
4
5
from tasks import my_task
res = my_task.delay() # res 的值为一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。

>>> res.result # 30
>>> res.failed() # False

使用 dir(res) 可以查看 AsyncResult 对象支持的方法。

在backend设置的存储设置中可以看到任务的返回值:

1
2
3
4
127.0.0.1:6379[2]> keys *
celery-task-meta-f63fe60e-6505-413d-b052-44116b95d238
127.0.0.1:6379[2]> get celery-task-meta-f63fe60e-6505-413d-b052-44116b95d238
{"status": "SUCCESS", "result": 30, "traceback": null, "children": [], "date_done": "2021-04-12T04:28:07.373922", "task_id": "f63fe60e-6505-413d-b052-44116b95d238"}

使用小结(基础的使用流程)

  1. 创建Celery实例:
    创建一个文件,一般使用tasks.py 作为文件名,在内部创建Celery实例。
    Celery实例的配置最基础的三个参数是: app_name, broker, backend (其中backend 如果不需要任务返回值,可以不设置)

    1
    2
    3
    4
    5
    6
    from celery import Celery

    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'

    app = Celery('demo', broker=broker, backend=backend)
  2. 创建任务对象:
    使用Celery的实例对象 app, 通过 @app.task 装饰器,创建任务。 注意只是创建了任务,并没有添加到broker中。

    1
    2
    3
    4
    @app.task
    def my_task(a, b):
    print('任务函数正在执行。。。。')
    return a + b
  3. 运行Celery服务:
    使用 celery -A 使用的application的名称 worker --loglevel=info
    celer 命令的组成部分:celery OPTIONS COMMAND
    OPTIONS 常用 -A 使用的application的名称,如这里的 tasks.py 文件就使用 -A tasks 即可。
    COMMAND 常用 worker ,表示开始运行celery实例。
    COMMAND 也提供了一些options,如 --loglevel=info 用来设置日志等级。

    注意:必须通过 -A 指定使用的应用文件,否则不会自动识别文件,会使用默认的celery,使用的是broker是rabbitmq。

    celery -A tasks worker --loglevel=info

  4. 发送任务到broker中:
    获取创建的任务对象,调用任务对象的delay()方法,如果任务有参数,直接传递到delay中即可。
    将任务发送到broker队列中,Wroker会自动发现队列中的任务,然后自动执行,执行的结果存储到创建Celery实例时backend中指定的存储设备中。
    调用任务对象的 delay() 方法的时候,会返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。

    1
    2
    3
    4
    from tasks import my_task

    res = my_task.delay(10, 20)
    res.result # 30

想要使用 AsyncResult 对象检查任务的状态或者获得任务的返回值则必须在创建 Celery 实例的时候指定backend。
任务是在 Celery 服务启动的时候一起设置进去的,当调用 任务.delay() 的时候被发送到broker队列中执行。
任务是可以被重复执行的。只需要调用 任务.delay() 即可。
任务可以执行失败,但是不会导致 Celery 服务终止,res.result 会返回错误信息(函数调用的错误信息)。

未完待续。。。