文章详情页 您现在的位置是:网站首页>文章详情

Django应用celery,实现多worker,多队列

图片丢失 Amir 发表于:2019年6月15日 16:24 分类:【日常笔记 1394次阅读

一、原理

celery是一个分布式的任务调度模块,那么怎么实现它的分布式功能呢,celery可以支持多台不同的计算机执行不同的任务或者相同的任务。

简单理解:

可以有多个"消息队列"message Queue),不同的消息可以指定发送给不同的Message Queue

而这是通过Exchange来实现的,发送消息到"消息队列"中时,可以指定routiing_keyExchange通过routing_key来吧消息路由(routes)到不同的"消息队列"中去。

2.png  

 

exchange 对应 一个消息队列(queue),即:通过"消息路由"的机制使exchange对应queue,每个queue对应每个worker

二、实例目录结构

3.png  

三、案例实现

1. 简单介绍目录结构

首先创建一个django项目,创建一个app测试用。

app的同级目录下创建关于celery的一个目录(celery_tasks),在celery_tasks创建一个config.py文件用来放配置文件,创建一个main.py文件,即为入口、配置加载文件。并在同级目录下创建一个/多个app(比如放送邮件,发送短信等模块,),然后在此app下创建存放异步任务代码的tasks.py文件,目录机构如上图。

2. main.py文件介绍如注释

from celery import Celery

import os

# celery使用django配置文件进行设置
if not os.getenv('DJANGO_SETTINGS_MODULE'):
    os.environ['DJANGO_SETTINGS_MODULE'] = 'amirtestcelery.settings'
# 创建celery应用
app = Celery('amirtestcelery')
# 导入celery配置文件
app.config_from_object('celery_tasks.config')
# 自动注册celery任务需要启动哪个模块下,则加入列表中
app.autodiscover_tasks(['celery_tasks.appone', 'celery_tasks.apptwo'])

 

3. 分别在tasks.py文件下创建异步任务

4.png  

5.png  

6.png  

这里提醒的是 @app.task(name='apptwo_mult')   这里的name是为这个方法提供一个命名空间(唯一),后面会用到

4. config.py文件介绍(关键)

import datetime
from celery.schedules import crontab
from kombu import Exchange, Queue

BROKER_URL = 'redis://localhost:6379/10'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/11'
CELERY_TIMEZONE = 'Asia/Shanghai'

# 队列
CELERY_QUEUES = (
    Queue("default", Exchange("default"), routing_key="default"),
    Queue("appone_add_queue", Exchange("appone_add_queue"), routing_key="appone_add_router"),
    Queue("apptwo_mult_queue", Exchange("apptwo_mult_queue"), routing_key="apptwo_mult_router")
)
# 路由
CELERY_ROUTES = {
    'appone_add': {"queue": "appone_add_queue", "routing_key": "appone_add_router"},
    'apptwo_mult': {"queue": "apptwo_mult_queue", "routing_key": "apptwo_mult_router"}
}

# 定时任务配置如下
CELERYBEAT_SCHEDULE = {
    'beat_task1': {
        'task': 'appthree_comment',
        'schedule': datetime.timedelta(seconds=2),
        'args': (2, 8)
    },
    'beat_task2': {
        'task': 'appthree_comment',
        'schedule': crontab(hour=16, minute=32),
        'args': (4, 5)
    }
}

# 单独对一个任务启动命令 会启动指定queue
# celery -A celery_tasks.main  worker -l info -n workerA.%h -Q appone_add_queue
# celery -A celery_tasks.main  worker -l info -n workerB.%h -Q apptwo_mult_queue

# 当任务没有指定queue 则任务会加入default 队列 beat(定时任务也会加入default队列)
# celery -A celery_tasks.main  worker -l info -n worker -Q celery

# 启动定时任务
# celery beat -A celery_tasks.main -l INFO


# -A 表示 应用目录  这里是celery_tasks.main
# -B 表示 定时任务
# -l 表示日志级别
# -n woker
# -Q 队列名
# .%h 对应不同主机ip  如果默认localhost,所以可以省略.%h

 

BROKER_URL 表示生产者存放任务的数据库,我这用的是redis

CELERY_RESULT_BACKEND  表示消费者存放的结果

CELERY_TIMEZONE   设置时区呗

CELERY_QUEUES 创建队列

7.png  

 

8.png  

 

 

5. 启动命令(代码都写好了,是时候启动了)

# 单独对一个任务启动命令 会启动指定queue
# celery -A celery_tasks.main  worker -l info -n workerA.%h -Q appone_add_queue
# celery -A celery_tasks.main  worker -l info -n workerB.%h -Q apptwo_mult_queue

# 当任务没有指定queue 则任务会加入default 队列 beat(定时任务也会加入default队列)
# celery -A celery_tasks.main  worker -l info -n worker -Q celery

# 启动定时任务
# celery beat -A celery_tasks.main -l INFO


# -A 表示 应用目录  这里是celery_tasks.main
# -B 表示 定时任务
# -l 表示日志级别 这是英文小写l不是数字1
# -n woker 自定义
# -Q 队列名
# .%h 对应不同主机ip  如果默认localhost,所以可以省略.%h

6. 随笔

Celery是典型的生产者与消费者模式,生产者(brokeradd任务到队列中,消费者(worker)去队列中BROKER_URL拿任务,执行完之后,结果放CELERY_RESULT_BACKEND中。创建多个队列之后,启动多个worker,每个worker启动一个主进程,每个主进程还可以创建多个子进程,当然可以通过CELERY_CONCURRENCY设置子进程个数,也就是并发数。

我是Amir,有兴趣探讨的可以联系我QQ429771087

Gitgit@github.com:AmirHuang/amirtestcelery.git


版权声明 本文属于本站  原创作品,文章版权归本站及作者所有,请尊重作者的创作成果,转载、引用自觉附上本文永久地址: https://www.lujianxin.com/x/art/27s487xprms4

文章评论区

作者名片

图片丢失
  • 作者昵称:Amir
  • 原创文章:1篇
  • 转载文章:0篇
  • 加入本站:214天

作者其他文章

站点信息

  • 运行天数:218天
  • 累计访问:25155人次
  • 今日访问:114人次
  • 原创文章:24篇
  • 转载文章:4篇
  • 微信公众号:第一时间获取更新信息