python框架-Celery任务队列

摘要

本文部分内容来源于网络,个人收集整理,请勿传播

Celery是一个基于python开发的一个简单、灵活、可靠的处理大量任务的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,它不仅支持实时处理也支持任务调度,如果你的业务场景中需要用到异步任务,就可以考虑使用celery

Celery在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果,一般使用rabbitMQ or Redis

  • 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活:几乎celery的各个组件都可以被扩展及自定制
  • 支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度

Celery基本工作流程图

组成部分

  • user: 用户程序,用于告知celery去执行一个任务。
  • Celery Beat: 任务调度器,自带的
  • Celery Worker: 执行任务的消费者,处理任务,通常设置多个
  • Broker: 消息代理,就是任务队列,用来存放任务,如RabbitMQRedis
  • Producer: 任务生产者,要执行的函数加上`@app.task`
  • Result Backend: 结果保存,如redis

Celery安装配置

1
pip3 install celery

celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中)、Database(不推荐)或者其他的消息中间件来充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。

Celery的默认broker是RabbitMQ, 仅需配置一行就可以

1
broker_url = 'amqp://guest:guest@localhost:5672//'

redis配置

1
2
app.conf.broker_url = 'redis://localhost:6379/0'
app.conf.result_backend = 'redis://localhost:6379/0'

基本使用

  • 执行 ctasks.py 创建worker(终端执行命令):
1
celery worker -A ctasks -l info
  • 执行 job.py ,创建一个任务并获取任务ID:
1
python3 job.py
  • 执行result.py ,检查任务状态并获取结果:
1
python3 result.py

ctasks.py

1
2
3
4
5
6
7
8
9
import time
from celery import Celery

app = Celery('tasks', broker='redis://127.0.0.1:6379', backend='redis://127.0.0.1:6379')

@app.task
def xxxxxx(x, y):
time.sleep(10)
return x + y

job.py

1
2
3
4
5
from ctasks import xxxxxx

# 立即告知celery去执行xxxxxx任务,并传入两个参数
result = xxxxxx.delay(4, 4)
print(result.id)

result.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from celery.result import AsyncResult
from ctasks import app

async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)

if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

多任务结构

1
2
3
4
5
6
pro_cel
├── celery_tasks# celery相关文件夹
│ ├── celery.py # celery连接和配置相关文件
│ └── tasks.py # 所有任务函数
├── check_result.py # 检查结果
└── send_task.py # 触发任务

pro_cel/celery_tasks/celery

1
2
3
4
5
6
7
8
9
10
11
from celery import Celery

celery = Celery('tasks',
broker='redis://127.0.0.1:6379',
backend='redis://192.168.0.111127.0.0.1:6379',
include=['celery_tasks.tasks'])

# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False

pro_cel/celery_tasks/tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
import time
from .celery import celery

@celery.task
def xxxxx(*args, **kwargs):
time.sleep(5)
return "任务结果"

@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"

pro_cel/check_result.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from celery.result import AsyncResult
from celery_tasks.celery import celery

async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)

if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

pro_cel/send_task.py

1
2
3
4
5
6
import celery_tasks.tasks

# 立即告知celery去执行xxxxxx任务,并传入两个参数
result = celery_tasks.tasks.xxxxx.delay(4, 4)

print(result.id)

更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html

定时任务

设定时间让celery执行一个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import datetime
from celery_tasks.tasks import xxxxx
"""
from datetime import datetime

v1 = datetime(2017, 4, 11, 3, 0, 0)
print(v1)

v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)

"""
ctime = datetime.datetime.now()
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())

s10 = datetime.timedelta(seconds=10)
ctime_x = utc_ctime + s10

# 使用apply_async并设定时间
result = xxxxx.apply_async(args=[1, 3], eta=ctime_x)
print(result.id)

类似于contab的定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
"""
celery beat -A proj
celery worker -A proj -l info

"""
from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='amqp://127.0.0.1:5672', backend='amqp://127.0.0.1:5672', include=['proj.s1', ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False

app.conf.beat_schedule = {
# 'add-every-10-seconds': {
# 'task': 'proj.s1.add1',
# 'schedule': 10.0,
# 'args': (16, 16)
# },
'add-every-12-seconds': {
'task': 'proj.s1.add1',
'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
'args': (16, 16)
},
}

注:如果想要定时执行类似于crontab的任务,需要定制Scheduler来完成。

Flask中应用Celery

1
2
3
4
5
pro_flask_celery/
├── app.py
├── celery_tasks
├── celery.py
└── tasks.py

app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from flask import Flask
from celery.result import AsyncResult

from celery_tasks import tasks
from celery_tasks.celery import celery

app = Flask(__name__)

TASK_ID = None

@app.route('/')
def index():
global TASK_ID
result = tasks.xxxxx.delay()
# result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 5, 19, 1, 24, 0))
TASK_ID = result.id

return "任务已经提交"


@app.route('/result')
def result():
global TASK_ID
result = AsyncResult(id=TASK_ID, app=celery)
if result.ready():
return result.get()
return "xxxx"


if __name__ == '__main__':
app.run()

celery_tasks/celery.py

1
2
3
4
5
6
7
8
9
10
11
12
from celery import Celery
from celery.schedules import crontab

celery = Celery('xxxxxx',
broker='redis://192.168.10.48:6379',
backend='redis://192.168.10.48:6379',
include=['celery_tasks.tasks'])

# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False

celery_task/tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time
from .celery import celery

@celery.task
def hello(*args, **kwargs):
print('执行hello')
return "hello"

@celery.task
def xxxxx(*args, **kwargs):
print('执行xxxxx')
return "xxxxx"

@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"

Django-celery

  • 同步请求:所有逻辑处理、数据计算任务在View中处理完毕后返回response。在View处理任务时用户处于等待状态,直到页面返回结果。
  • 异步请求:View中先返回response,再在后台处理任务。用户无需等待,可以继续浏览网站。当任务处理完成时,我们可以再告知用户。

用户从Web前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列broker中,由空闲的worker去处理任务即可,处理的结果会暂存在后台数据库backend中。我们可以在一台机器或多台机器上同时起多个worker进程来实现分布式地并行处理任务。

主要基于celery4.x版本

1
2
# pip install celery 3.x
pip install django-celery

建立消息队列

首先,我们必须拥有一个broker消息队列用于发送和接收消息

1
2
sudo apt-get install rabbitmq-server
sudo apt-get install redis

配置settings.py

在Django工程的settings.py文件中加入如下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# mq配置
import djcelery
djcelery.setup_loader()
BROKER_URL= 'amqp://guest@localhost//'
CELERY_RESULT_BACKEND = 'amqp://guest@localhost//'
#############################
# celery 配置信息 start
#############################
import djcelery
djcelery.setup_loader()
# ​代理人,它负责分发任务给worker去执行
BROKER_URL = 'redis://127.0.0.1:63790/9'
# ​如果没有设置CELERY_RESULT_BACKEND,默认没有配置,此时Django会使用默认的数据库
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:63790/9'
# BROKER_TRANSPORT = 'redis'
# 导入目标任务文件
# CELERY_IMPORTS = ('base.tasks')
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
CELERYD_MAX_TASKS_PER_CHILD = 3
  • djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task
  • BROKER_URLCELERY_RESULT_BACKEND分别指代你的Broker的代理地址以及Backend(result store)数据存储地址。在Django中如果没有设置backend,会使用其默认的后台数据库用来存储数据。

注意,此处backend的设置是通过关键字CELERY_RESULT_BACKEND来配置,与一般celery配置方式不一样。

时区

1
CELERY_ENABLE_UTC = False

内存泄漏

默认每个worker跑完100个任务后才会自我销毁程序重建来释放内存,所以需要增加一个配置定义每个worker执行多少个任务后才会自我销毁重建。

1
2
CELERYD_MAX_TASKS_PER_CHILD = 3
# 每个worker最多执行3个任务就会被销毁,可防止内存泄露

注册

在INSTALLED_APPS加入djcelery

1
2
3
4
5
6
INSTALLED_APPS = (
……
'qv',
'djcelery'
……
)

创建tasks

  • 在要使用该任务队列的app根目录下创建tasks.py
  • 在tasks.py中我们就可以编码实现我们需要执行的任务逻辑
  • 在开始处import task,然后在要执行的任务方法开头用上装饰器@task
  • 与一般的celery不同,tasks.py必须建在各app的根目录下,且不能随意命名。
1
2
3
4
5
6
7
8
# tasks.py
from celery import task
import time

@task
def tadd(a,b):
time.sleep(10)
return int(a) + int(b)

生产任务

在需要执行该任务的View中,通过tadd.delay的方式来创建任务,并送入消息队列

1
2
3
4
5
6
from django.http import HttpResponse
from .tasks import tadd

def demo(request):
tadd.delay(10,20)
return HttpResponse('ok')

启动worker的命令

1
2
3
4
# 先启动服务器
python manage.py runserver
# 再启动worker
python manage.py celery worker -c 4 --loglevel=info

创建定时器

同步数据库

1
python manage.py migrate

第一种settings配置

任务会自动把数据添加到数据库中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery.schedules import crontab
from celery.schedules import timedelta

# 使用了django-celery默认的数据库调度模型,任务执行周期都被存在数据库中.默认使用celery.beat.PersistentScheduler
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
# 定时器策略,​设置定时的时间配置, 可以精确到秒,分钟,小时,天,周等
CELERYBEAT_SCHEDULE = {
# 定时任务一: 每隔30s运行一次
u'mytask1': {
"task": "demo.tasks.tadd",
# "schedule": crontab(minute='*/2'),
"schedule": timedelta(seconds=30),
"args": (1,2),
},
}

启动beat

1
python manage.py celery beat --loglevel=info

直接在数据库中添加任务

flower

Celery提供了一个工具flower,将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现

1
2
3
4
5
# 安装flower:
pip install flower
# 启动flower(默认会启动一个webserver,端口为5555)
python manage.py celery flower
# 进入http://localhost:5555即可查看

django-celery-beat

主要基于celery4.x版本

1
2
# pip install celery 4.x
pip install django_celery_beat

官方文档

celery4.x版本的django-celery-beat在djang-admin后台添加task后系统不会对修改后的任务即时生效,而是需要重启celery-beat服务才能生效

已验证,任务修改后可以直接生效

建立消息队列

首先,我们必须拥有一个broker消息队列用于发送和接收消息

1
2
sudo apt-get install rabbitmq-server
sudo apt-get install redis

注册

1
'django_celery_beat',

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Celery settings
# ​代理人,它负责分发任务给worker去执行
CELERY_BROKER_URL = 'redis://127.0.0.1:63790/7'
# ​如果没有设置CELERY_RESULT_BACKEND,默认没有配置,此时Django会使用默认的数据库
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
# json or pickle
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERY_RESULT_EXPIRES = 3600
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s'
# CELERY_WORKER_LOG_FORMAT = '%(message)s'
CELERY_WORKER_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s'
# CELERY_WORKER_TASK_LOG_FORMAT = '%(message)s'
CELERY_TASK_EAGER_PROPAGATES = True
CELERY_REDIRECT_STDOUTS = True
CELERY_REDIRECT_STDOUTS_LEVEL = "INFO"
CELERY_WORKER_HIJACK_ROOT_LOGGER = False

创建app

在项目配置中创建celery.py文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 绝对引用,防止相对引用被当前文件名影响
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 为celery程序设置DJANGO_SETTINGS_MODULE环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'anthill.settings')

from django.conf import settings

app = Celery('anthill')

# 从Django的设置文件中导入CELERY设置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 从所有已注册的app中加载任务模块
app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS])
# app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

__init__.py中配置

1
2
3
4
5
6
from __future__ import absolute_import, unicode_literals

# 这将保证celery app总能在django应用启动时启动
from .celery import app as celery_app

__all__ = ['celery_app']

同步数据库

1
python manage.py migrate

启动

1
2
3
4
5
celery worker -A anthill -c 10 --loglevel=info -B
celery beat -A anthill --loglevel=info -S django
# 启动的时候指定调度器数据库,默认
# celery -A anthill beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
celery flower -A anthill

目前版本修改或者新增任务无需再重启beat

http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#using-custom-scheduler-classes

配置计划任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from celery.schedules import crontab
from celery.schedules import timedelta

# 使用了django-celery默认的数据库调度模型,任务执行周期都被存在数据库中.默认使用celery.beat.PersistentScheduler
# CELERY_BEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
# # 定时器策略,​设置定时的时间配置, 可以精确到秒,分钟,小时,天,周等
CELERY_BEAT_SCHEDULE = {
# 定时任务一: 每隔30s运行一次
u'mytask1': {
"task": "demo.tasks.tadd",
# "schedule": crontab(minute='*/2'),
"schedule": timedelta(seconds=30),
"args": (1,2),
},
}

############

app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}

app.conf.timezone = 'UTC'

###########

from celery.schedules import crontab

app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}

###########

from celery.schedules import solar

app.conf.beat_schedule = {
# Executes at sunset in Melbourne
'add-at-melbourne-sunset': {
'task': 'tasks.add',
'schedule': solar('sunset', -37.81753, 144.96715),
'args': (16, 16),
},
}

##########

from celery.schedules import timedelta

app.conf.beat_schedule = {
# Executes at sunset in Melbourne
'add-at-melbourne-sunset': {
'task': 'tasks.add',
'schedule': timedelta(seconds=30),
'args': (16, 16),
},
}

Django-celery-results

使用Django的ORM、CACHE来保存result backend

安装

1
pip install django-celery-results

注册

1
2
3
4
INSTALLED_APPS = (
...,
'django_celery_results',
)

同步数据库

1
python manage.py migrate django_celery_results

配置

1
2
CELERY_RESULT_BACKEND = 'django-db'
CELERY_RESULT_BACKEND = 'django-cache'