摘要
本文部分内容来源于网络,个人收集整理,请勿传播
Celery
是一个基于python开发的一个简单、灵活、可靠的处理大量任务的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,它不仅支持实时处理也支持任务调度,如果你的业务场景中需要用到异步任务,就可以考虑使用celery
。
Celery
在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果,一般使用rabbitMQ or Redis
- 简单:一单熟悉了
celery
的工作流程后,配置和使用还是比较简单的 - 高可用:当任务执行失败或执行过程中发生连接中断,
celery
会自动尝试重新执行任务 - 快速:一个单进程的
celery
每分钟可处理上百万个任务 - 灵活:几乎
celery
的各个组件都可以被扩展及自定制 - 支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度
Celery基本工作流程图
组成部分
user
: 用户程序,用于告知celery
去执行一个任务。Celery Beat
: 任务调度器,自带的Celery Worker
: 执行任务的消费者,处理任务,通常设置多个Broker
: 消息代理,就是任务队列,用来存放任务,如RabbitMQ
或Redis
Producer
: 任务生产者,要执行的函数加上`@app.task`Result Backend
: 结果保存,如redis
Celery安装配置
1 | pip3 install celery |
celery
需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中)、Database(不推荐)
或者其他的消息中间件来充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。
Celery的默认broker是RabbitMQ, 仅需配置一行就可以
1 | broker_url = 'amqp://guest:guest@localhost:5672//' |
redis配置
1 | app.conf.broker_url = 'redis://localhost:6379/0' |
基本使用
- 执行 ctasks.py 创建worker(终端执行命令):
1 | celery worker -A ctasks -l info |
- 执行 job.py ,创建一个任务并获取任务ID:
1 | python3 job.py |
- 执行result.py ,检查任务状态并获取结果:
1 | python3 result.py |
ctasks.py
1 | import time |
job.py
1 | from ctasks import xxxxxx |
result.py
1 | from celery.result import AsyncResult |
多任务结构
1 | pro_cel |
pro_cel/celery_tasks/celery
1 | from celery import Celery |
pro_cel/celery_tasks/tasks.py
1 | import time |
pro_cel/check_result.py
1 | from celery.result import AsyncResult |
pro_cel/send_task.py
1 | import celery_tasks.tasks |
更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html
定时任务
设定时间让celery执行一个任务
1 | import datetime |
类似于contab的定时任务
1 | """ |
注:如果想要定时执行类似于crontab的任务,需要定制Scheduler来完成。
Flask中应用Celery
1 | pro_flask_celery/ |
app.py
1 | from flask import Flask |
celery_tasks/celery.py
1 | from celery import Celery |
celery_task/tasks.py
1 | import time |
Django-celery
- 同步请求:所有逻辑处理、数据计算任务在View中处理完毕后返回response。在View处理任务时用户处于等待状态,直到页面返回结果。
- 异步请求:View中先返回response,再在后台处理任务。用户无需等待,可以继续浏览网站。当任务处理完成时,我们可以再告知用户。
用户从Web前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列broker中,由空闲的worker去处理任务即可,处理的结果会暂存在后台数据库backend中。我们可以在一台机器或多台机器上同时起多个worker进程来实现分布式地并行处理任务。
主要基于celery4.x
版本
1 | # pip install celery 3.x |
建立消息队列
首先,我们必须拥有一个broker
消息队列用于发送和接收消息
1 | sudo apt-get install rabbitmq-server |
配置settings.py
在Django工程的settings.py文件中加入如下配置
1 | # mq配置 |
- 当
djcelery.setup_loader()
运行时,Celery便会去查看INSTALLD_APPS
下包含的所有app目录中的tasks.py
文件,找到标记为task的方法,将它们注册为celery task
。 BROKER_URL
和CELERY_RESULT_BACKEND
分别指代你的Broker
的代理地址以及Backend(result store)
数据存储地址。在Django中如果没有设置backend,会使用其默认的后台数据库用来存储数据。
注意,此处backend的设置是通过关键字
CELERY_RESULT_BACKEND
来配置,与一般celery配置方式不一样。
时区
1 | CELERY_ENABLE_UTC = False |
内存泄漏
默认每个worker跑完100个任务后才会自我销毁程序重建来释放内存,所以需要增加一个配置定义每个worker执行多少个任务后才会自我销毁重建。
1 | CELERYD_MAX_TASKS_PER_CHILD = 3 |
注册
在INSTALLED_APPS加入djcelery
1 | INSTALLED_APPS = ( |
创建tasks
- 在要使用该任务队列的app根目录下创建
tasks.py
- 在tasks.py中我们就可以编码实现我们需要执行的任务逻辑
- 在开始处
import task
,然后在要执行的任务方法开头用上装饰器@task - 与一般的celery不同,
tasks.py
必须建在各app的根目录下,且不能随意命名。
1 | # tasks.py |
生产任务
在需要执行该任务的View中,通过tadd.delay
的方式来创建任务,并送入消息队列
1 | from django.http import HttpResponse |
启动worker的命令
1 | # 先启动服务器 |
创建定时器
同步数据库
1 | python manage.py migrate |
第一种settings配置
任务会自动把数据添加到数据库中
1 | from celery.schedules import crontab |
启动beat
1 | python manage.py celery beat --loglevel=info |
直接在数据库中添加任务
flower
Celery提供了一个工具flower,将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现
1 | # 安装flower: |
django-celery-beat
主要基于celery4.x
版本
1 | # pip install celery 4.x |
celery4.x版本的django-celery-beat在djang-admin后台添加task后系统不会对修改后的任务即时生效,而是需要重启celery-beat服务才能生效
已验证,任务修改后可以直接生效
建立消息队列
首先,我们必须拥有一个broker
消息队列用于发送和接收消息
1 | sudo apt-get install rabbitmq-server |
注册
1 | 'django_celery_beat', |
配置
1 | # Celery settings |
创建app
在项目配置中创建celery.py
文件
1 | # 绝对引用,防止相对引用被当前文件名影响 |
在__init__.py
中配置
1 | from __future__ import absolute_import, unicode_literals |
同步数据库
1 | python manage.py migrate |
启动
1 | celery worker -A anthill -c 10 --loglevel=info -B |
目前版本修改或者新增任务无需再重启beat
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#using-custom-scheduler-classes
配置计划任务
1 | from celery.schedules import crontab |
Django-celery-results
使用Django
的ORM、CACHE来保存result backend
安装
1 | pip install django-celery-results |
注册
1 | INSTALLED_APPS = ( |
同步数据库
1 | python manage.py migrate django_celery_results |
配置
1 | CELERY_RESULT_BACKEND = 'django-db' |