
Flask项目集成Celery
1. 前言
之前在做公司一个项目的时候,需要将前端数据提交到服务器,由服务器进行计算,将计算结果返回给前端。由于存在部分任务计算时间过长,因此会导致前端在发送计算请求后,在请求的限制时间内并不能得到计算的结果。此时,要想实现快速响应客户端的需求,就需要考虑使用任务队列了。
任务队列是一个单独的程序,与网站没关系。只是任务队列提供了接口,在网站中通过代码操作任务队列,比如:添加任务,查看任务结果等。
2. Celery介绍
Celery 是一个简单、灵活、可靠的分布式系统,用于处理大量消息,同时提供维护此类系统所需的工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
官方文档:Introduction
因为是使用python项目开发,所以安装Celery
的命令如下:
xxxxxxxxxx
11pip install celery
3. Celery工作原理
3.1 生产者消费者模式
在介绍Celery
工作原理之前,需要简单介绍一下生产者消费者模式
。
在实际的软件开发过程中,经常会碰到如下场景:
某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。
产生数据的模块,就形象地称为生产者(
Producer
);而处理数据的模块,就称为消费者(Consumer
)。
生产者消费者模式
是通过一个容器来解决生产者和消费者的强耦合问题。生产者(Producer
)和消费者(Consumer)彼此之间不直接通讯,而通过消息队列(缓冲区Buffer
)来进行通讯。
简单来说:生产者(Producer
)生产完数据之后不用等待消费者(Consumer
)处理,直接扔给消息队列,消费者(Consumer
)不找生产者(Producer
)要数据,而是直接从消息队列里取,消息队列就相当于一个缓冲区(Buffer
),平衡了生产者(Producer
)和消费者(Consumer
)的处理能力。
举个通俗的例子:
假如你非常喜欢吃包子(吃起来根本停不下来),今天,你妈妈(生产者)在蒸包子,厨房有张桌子(缓冲区),你妈妈将蒸熟的包子盛在盘子里,然后放到桌子上,你在看电视,看到桌子上有包子,你就会拿走。
此时,你和你妈妈使用了同一个桌子,这个桌子就是共享对象。生产者用于添加包子,消费者用于取走包子。好处是,你妈妈(生产者)不用直接把包子给你,只需要把包子放在桌子上,如果桌子满了,就不再放了,等待。即使有多个消费者也无所谓。
这样的好处是:生产者不用关注哪个消费者拿了包子,而消费者只需要关注桌子上是否有包子,没有的话就等待。
3.2 Celery原理
理解了生产者消费者模式
之后,介绍celery的原理。
Celery的架构由三部分组成,消息中间件(message broker
),任务执行单元(worker
)和任务执行结果存储(task result store
)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件(Broker
)集成,包括,RabbitMQ, Redis, MongoDB等。
任务执行单元
Worker
是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store
用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果。
工作原理如上图所示:
Celery
支持手动发布任务,也支持定时任务。不管任务从哪里来,会先把任务存放到Broker
(中间人)中。接着Celery
会生成worker
,来从Broker
中读取任务执行。执行完成后,再把执行后的结果存放到Backend
中。
- 其中,常用作
Celery Broker
的有Redis
、RabbitMQ
、数据库
等,其中Redis
和RabbitMQ
的稳定性和效率是最高的。 - 常用作
Celery Backend
的有Redis
和数据库
。
Celery
官方推荐的Broker
和Backend
搭配为:RabbitMQ(Broker
)+Redis(Backend)
。
但我们平常一台服务器上运行Redis
和RabbitMQ
有点太浪费了,而且RabbitMQ
只有在消息非常大的时候才能体现优势,因此一般我们直接使用Redis
作为Broker
和Backend
即可。
4. Redis安装
Redis官方是只有Linux版本的,可以根据官方文档直接下载:https://redis.io/download,或者使用带有Redis的Docker即可。
这里不推荐使用Windows安装Redis。
5. Flask中使用Celery
Celery
和Redis
都安装成功后,就可以在Flask
中集成Celery
了。当然Flask
官方文档也描述了如何集成Celery
,但是那种方式不适合现实中大型项目结构的,很容易引起循环引用的问题。这里我的结构如下:
xxxxxxxxxx
61|- project
2|-- apps
3|--- auth
4|---- views.py
5| -- app.py
6| -- mycelery.py
其中和celery有关系的文件如下:
- mycelery.py:创建celery对象,并且添加了任务。
- app.py:对celery进行app绑定。
- views.py:调用celery中的任务。
就以发送邮件为例,我的这三个文件的代码如下:
mycelery.py:
xxxxxxxxxx
331from flask_mail import Message
2from exts import mail
3from celery import Celery
4
5
6# 定义任务函数
7def send_mail(recipient, subject, body):
8 message = Message(subject=subject, recipients=[recipient], body=body)
9 mail.send(message)
10 return {"status": "SUCCESS"}
11
12
13# 创建celery对象
14def make_celery(app):
15 celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
16 broker=app.config['CELERY_BROKER_URL'])
17 TaskBase = celery.Task
18
19 class ContextTask(TaskBase):
20 abstract = True
21
22 def __call__(self, *args, **kwargs):
23 with app.app_context():
24 return TaskBase.__call__(self, *args, **kwargs)
25
26
27 celery.Task = ContextTask
28 app.celery = celery
29
30 # 添加任务
31 celery.task(name="send_mail")(send_mail)
32
33 return celery
app.py:
xxxxxxxxxx
41from mycelery import make_celery
2
3# 构建celery
4celery = make_celery(app)
views.py:
xxxxxxxxxx
151from flask import current_app, jsonify
2
3
4route("/mail") .
5def mail_test():
6 try:
7 email = request.args.get("mail")
8 subject = "邮箱主题"
9 body = "邮箱内容"
10 current_app.celery.send_task("send_mail", (email, subject, body))
11 return jsonify("success")
12 except Exception as e:
13 print(e)
14 return jsonify("fail")
15
6. 运行Celery
打开cmd终端,然后输入以下命令即可运行Celery:
xxxxxxxxxx
11celery -A app.celery worker --loglevel=info -P gevent
其中的celery -A
是固定写法,app
代表我的app.py
模块,celery
代表我的app.py
中的celery
对象,--loglevel
代表日志级别。
以上即成功运行了Celery
,我们访问发送邮件的URL(/mail),即可成功使用celery
异步发送邮件了。
7. 访问任务结果
在实际业务场景中,经常需要查看某个任务的执行状态。比如算法计算,我们会在客户端查看是否计算完成。这时候可以定义一个查看任务状态的URL,将任务状态返回。比如:
xxxxxxxxxx
231route('/status/<task_id>') .
2def taskstatus(task_id):
3 task = long_task.AsyncResult(task_id)
4 if task.state == 'PENDING':
5 # 任务还没开始
6 response = {
7 'state': task.state,
8 'status': '排队中...'
9 }
10 elif task.state != 'FAILURE':
11 response = {
12 'state': task.state,
13 'status': task.info.get('status', '')
14 }
15 if 'result' in task.info:
16 response['result'] = task.info['result']
17 else:
18 # 如果不是PENDING,或者SUCCESS,那么可能是出现异常了
19 response = {
20 'state': task.state,
21 'status': str(task.info), # 返回错误信息
22 }
23 return jsonify(response)
至此,结束。