【celery】异步组件

1. Celery概念
Celery是处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
Celery架构由三部分组成:
消息中间件
Celery本身不提供消息服务,但是可以和第三方消息中间件集成,例如RabbitMQ、Redis等,
任务执行单元
任务执行单元并发地运行在分布式的系统节点中。
任务执行结果存储
用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP、redis等。
这三部分的关系如下图所示:

Celery使用场景:
异步任务
将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等。
定时任务
定时执行某件事情,比如每天数据统计。
Celery安装
2. Celery使用
环境信息:
平台: win10, redis 6.0.16是部署在ubuntu虚拟机上的
python 3.8
celery 5.2.7
django 4.1.4
2.1 单目录结构
2.1.1 消费者
创建celery任务
执行celery任务:
windows环境:
-A 表示应用 ,20_1_demo是应用文件名称;
worker表示启动任务实例;
-l info表示输出的日志级别为info
-P 指定使用eventlet(绿色线程,待了解)
-c 并发数
linux环境:
注: 在windows上使用linux命令启动celery监听后,发现生产者发送过来的消息,celery任务一直运行不起来,需要指定-P eventlet才能正常运行。
这个命令做的事情:
连接消息中间件,创建消息队列,监听消息队列
创建多个worker,监听任务
这个时候消费者任务(send_email、send_msg)还没有运行,只是扫描监听到了,运行结果:

下图是redis2号库里生成的数据:

2.1.2 生产者
向消息队列中添加消息,消息队列中保存的内容是:函数名和参数
运行之后将两个任务的id输出,这个时候并不能保证任务运行结束了,生产者只需要把消息发送到消息队列里即可,任务运行成功或失败需要查询才能知道。

查询任务是否运行成功:
运行结果:

由第1章的Celry原理图,可以知道,任务运行成功之后会将结果保存到数据库里,这儿打印出来的ok是从数据库读取出来的,查看下redis1号库中的内容,发现程序输出结果和库里的内容保持一致,输出结果还保存了其他信息,比如:任务状态、任务完成时间、任务id等。

2.2 多目录结构
多目录结构和单目录结构的区别就是:celery任务放在了不同的文件里。
下面是例子:

启动方式:
需要进入到目录celery_tasks上一层目录执行,-A指定的应用为目录名称celery_tasks
消费者启动之后,生产者和2.1.2章内容没什么区别。
生产者将消息存入到消息队列后,任务执行完毕,从终端输出可以看出celery任务输出内容和返回结果:

3. Celery简单结构下的定时任务
3.1 方式一:自定义几点几分运行
Celery消费者任务要提前启动,等待生产者推送消息。生产者定时推送消息使用的是函数apply_async。
函数apply_async包含如下参数:
args: celery任务接受的参数,以列表形式展示,例如下方代码中send_email任务需要接收一个参数name。
eta: 定时时间,下方代码中指定时间,再转换为时间戳,再转换为国标时间,传递给参数eta,Celery任务会在指定时间调用。
代码:

3.2 方式二:指定多长时间后运行任务
使用datetime.timedelta()做时间间隔定时。

4. Celery多目录结构下的定时任务
4.1 定时任务配置
第2章中多目录结构配置文件celery.py中配置定时任务:
注意:beat_schedule千万不要写为beat_scheduler
调度器beat_schedule每个子元素都是一个定时任务,子元素的key是定时任务的名称,可以随意写;子元素的内容里的key是固定的,不可以改。
定时方式设置:
timedelta(seconds=10): 每隔10秒钟调用一次
crontabl(minute="*/2"): 每隔2分钟调用一次。
定时任务配置代码参考:
4.2 work启动
work就是消费者,把Celery任务准备好之后,等待生产者往消息队列里插入数据后,读取消息进行消费(运行Celery任务)。
如下图所示:celery beat启动之后,每隔2分钟调用一次任务。

4.3 celery beat启动定时任务
定时任务是生产者的角色,按配置的时间将任务消息(Celery任务函数名称、参数、任务id等信息组成的消息)插入消息队列。

5. Django中使用Celery完成异步和定时任务
先看下目录结构:
Django项目创建之后,创建celery消费者目录,这里面存放Celery任务。

5.1 创建celery配置
5.2 创建celery任务
任务包名email和sms可以任意修改,但是报名下的tasks.py不可以修改,celery识别任务时会根据tasks.py进行任务扫描。

5.3 创建celery实例并启动
这一部分是创建Celery实例应用,把celery的配置config.py和任务加载进来。
启动Celery任务命令:
5.4 Django视图调用Celery任务
此时的Django视图相当于生产者,当访问到test()视图时,会调起Celery任务(就是把任务消息放到消息队列里),消费者端会根据任务类型(异步任务/定时任务)执行对应的Celery任务。
访问之后,消费者端可以成功接收任务并执行:

6. 参考资料
B站视频: BV1tq4y187k5 (推荐)
Celery官方文档: https://docs.celeryq.dev/en/stable/index.html (辅助)
练习代码上传至:https://gitee.com/atiaisi/py_learn/tree/master/20_Celery%E5%BC%82%E6%AD%A5%E6%A1%86%E6%9E%B6