使用Celery和Redis构建任务队列

1、flask的阻塞

flask是一个阻塞式的框架,在flask自带的服务器中,一次只能处理一个请求。所以、使用flask开发的网站后台处理网络请求也是这样的,类似于工厂的流水线生产、一个工人只有完成自己手上当前的工作,才能接着完成流水线上过来的另一个工作。

后台开发中,不可避免的会存在一些非常耗时的请求,比如一些IO操作,或者耗时的计算任务。这样导致上一个请求处理完成之前,服务器无法处理其它请求,这样就造成了阻塞。虽然增加线程的数量能在一定程度上缓解这种糟糕的情况,但当并发量非常大时,多线程就显得无能为力。所以,要增加网络的响应速度,处理好耗时的操作是关键。

    在下面的图中,显示了一台小型Linux服务器的负载情况,可以看出,在设置1GB的空间作为交换空间的时候,性能任然捉襟见肘,

1G swap

    在内存很小的情况下,为了避免Linux服务器宕机,设置更大的交换分区可以稍微解决燃眉之急,在设置2G的交换空间之后,情况得到了缓解

2G swap

    当然,最好的情况是直接升级服务器的硬件,因为将一部分硬盘设置为交换空间,也会影响磁盘的读写性能,除了硬件升级,优化程序也能带来很大的效率提升。

2、了解redis

    我们已经知道诸如Mysql,sqlserver,Mongodb这类的数据库,这些数据库都有一个共同点,都是将数据存储于硬盘。数据存储在硬盘中的优点是硬盘空间大,但是读写性能的低下也困扰着开发者,特别是对于一些请求量非常大但是变更很少的数据,读取的成本比较高,如果能进行缓存,可以提升很大的IO效率。

    有现实的需求,自然就有相应的工具实现。redis便是一个基于内存也可以持久化的数据库,将数据存储于内存中(当数据大小没有超过”swappability=age*log(size_in_memory)这个值时,超过了便写入硬盘),大大提高热点数据的读取效率。同时,它不仅支持key/value类型的数据,同时还提供了list,hash,set等数据结构的存储。redis的相关使用以后将会用单独的文章来写,这里就不详细描述了

3、Celery

    Celery是使用python编写的一个分布式系统,可以被很多语言编写的程序调用,进行异步任务队列的分发。异步,简而言之就是程序将当前要做的事情先放在后台,稍后再进行处理,缓解当前的压力。

    Celery中有一个重要的概念叫broker,我们知道,Celery中的任务是异步地移交给后台处理,那么怎么能确保系统能记住这个任务不会忘记呢,答案就是这个broker,broker便是存储任务的地方,一般使用RabbitMQ或者Redis。同时,还有一个概念是backend,Celery的异步任务是不能立即返回结果给请求的,如果需要返回结果,便可以先将结果暂存,下次请求就可以直接返回了。

    下面的图展示了任务在redis中的存储情况:
celery-meta-data

    在知道Celery的工作流之后,用个简单的列子来加深一下理解:

1
2
3
4
5
6
7
8
9
10
# -*- coding: utf-8 -*-
from celery import Celery

celery = Celery(__name__, broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0') # 创建celery对象,任务和结果都存储在redis中

@celery.task
def need_long_time():
pass # 耗时的操作

need_long_time.delay() # 调用任务

在控制台中调用后,可以看到结果是一个AsyncResult:

1
2
3
>>> from task import need_long_time
>>> need_long_time.delay()
<AsyncResult: 4dsade14e-kdjs-45hj-90as-34hjec7kj652>

4、Flask使用异步任务队列

    编写简单的Celery任务小程序很简单,但是将其集成在大型的后台项目,却会遇到一些问题。下面以集成到Flask项目中为列,介绍一下流程。

    由于我的项目使用了抽象工厂的模式,Celery如何在Flask程序中初始化并且如何进入程序上下文,便成为了一个问题,而且Celery只能处于程序上下文之中,在第一次构建之后,使用下面的命令启动单独启动Celery:

1
bash: celery worker -l INFO -A app.main.create_celery.celery # 依据情况替换文件名

报了下面的错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/celery/task/trace.py", line 228, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/celery/task/trace.py", line 415, in __protected_call__
return self.run(*args, **kwargs)
File "/home/ryan/www/CG-Website/src/util/mail.py", line 28, in send_forgot_email
msg = Message("Recover your Crusade Gaming Account")
File "/usr/lib/python2.7/site-packages/flask_mail.py", line 178, in __init__
sender = current_app.config.get("DEFAULT_MAIL_SENDER")
File "/usr/lib/python2.7/site-packages/werkzeug/local.py", line 336, in __getattr__
return getattr(self._get_current_object(), name)
File "/usr/lib/python2.7/site-packages/werkzeug/local.py", line 295, in _get_current_object
return self.__local()
File "/usr/lib/python2.7/site-packages/flask/globals.py", line 26, in _find_app
raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context

    这个错误一看便吻合了我们刚才说的Celery要运行于项目的程序上下文中,经过查阅,Flask给我们提供了相关的操作,因为python的一个进程可以运行多个应用,此时的celery作为一个应用,需要将它绑定到进程中,:

1
2
3
4
5
6
7
8
9
10
11
12
13
# -*- coding: utf-8 -*-
"""
celery_worker
~~~~~~~~~~~~

为celery创建app_context
app: 项目名

"""
from app import create_app, celery

app = create_app()
app.app_context().push()

    做完这一步,根据项目的文件存放位置不同,除了注意这个working outside of application context的坑,还要注意python循环导包的问题,因为有celery的项目文件构成一般较复杂,稍不注意,便容易诱发这个问题。

    上面对工具链和相关问题进行了分析,下面看看运行成功的情况。

    启动Celery程序之后,控制台下会打印启动信息(出错的话错误信息也会在这里打印):
celery-start

    Celery接收任务:
celery-recive-task

    Celery接收多个任务,并完成一些任务,任务完成后会在控制台打印信息,当然在服务器部署的话会打印到log文件:
celery-revice-mult-task

5、参考

热评文章