足球盘口软件
当前位置: 足球盘口软件 > 前端 >
异步任务利器Celery,Celery学习笔记

Celery 快速入门

Celery

Celery (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。

        Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具。

django项目开发中遇到过一些问题,发送请求后服务器要进行一系列耗时非常长的操作,用户要等待很久的时间。可不可以立刻对用户返回响应,然后在后台运行那些操作呢?

任务队列

任务队列用于分发工作给不同线程或机器。

Celery通过消息传递

支持多个workers和brokers。提供高可用和水平扩展性。

用Python写的

Celery架构

图片 1

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件: Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

任务执行单元 :Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储 :Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 另外, Celery还支持不同的并发和序列化的手段。

并发 :Prefork, Eventlet, gevent, threads/single threaded

序列化 :pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等

        Celery 专注于实时任务处理,支持任务调度。

crontab定时任务很难达到这样的要求 ,异步任务是很好的解决方法,有一个使用python写的非常好用的异步任务工具Celery。

优点

  • 简单
  • 高可用
  • 易扩展

安装

pip install celery

        说白了,它是一个分布式队列的管理工具,我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。

broker、worker和backend

Celery的架构由三部分组成,消息中间件(broker),任务执行单元(worker)和任务执行结果存储(result backends)组成。

应用程序调用Celery的时候,会向broker传递消息,而后worker将会取到消息,对程序进行执行,backend用于存储这些消息以及Celery执行的结果。

消息中间件broker

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。

RabbitMQ是最好的消息中间件,使用方法如下:

Using RabbitMQ

Redis也是可行的,虽然有信息丢失的风险:

Using Redis

其余broker Broker Overview。

任务执行单元worker

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储backend

用来存储Worker执行的任务的结果:SQLAlchemy/Django ORM, Memcached, Redis, RPC (RabbitMQ/AMQP)。

支持

  • Brokers

    • RabbitMQ, Redis
    • MongoDB, ZeroMQ
    • CouchDB, SQLAlchemy
    • Django ORM, Amazon SQS,
    • ...
  • Result Stores

    • AMQP, Redis
    • memcached, MongoDB
    • SQLAlchemy, Django ORM
    • Apache Cassandra
  • Concurrency

    • prefork
    • Eventlet, gevent
    • threads/single threaded
  • Serialization

    • pickle, json, yaml, msgpack.
    • zlib, bzip2 compression.
    • Cryptographic message signing

选择消息中间件(Broker)

消息中间件有多种选择,本文使用redis。其他的消息中间件的使用方法参考官方文档。

redis的配置形式如下

redis://:password@hostname:port/db_number

例如:

BROKER_URL = 'redis://localhost:6379/0'

(Celery 本身不是任务队列,它是管理分布式任务队列的工具,或者换一种说法,它封装好了操作常见任务队列的各种操作,我们用它可以快速进行任务队列的使用与管理,当然你也可以自己看 rabbitmq 等队列的文档然后自己实现相关操作都是没有问题的。)

下载

下载Celery很简单:

$ pip install celery

这里使用Redis作为broker实践一下,需要额外的库支持,可以一起下载:

$ pip install -U "celery[redis]"

安装

pip install celery

pip install beanstalkc

应用

创建一个应用,broker使用redis:

图片 2

使用命令:

$ celery -A tasks worker --loglevel=info

图片 3

对于celery的命令可以查看celery --help以及 celery worker --help

执行结果:

图片 4

发送一个任务:

图片 5

执行结果:

图片 6


写应用程序

写一个简单的应用tasks.py:

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

选择Broker

$ sudo apt-get install rabbitmq-server

保存结果

如果需要保存结果的话,需要给celery配置一个backend。

图片 7

执行:

图片 8

 Celery架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

运行worker

在命令行中运行:

$ celery -A tasks worker --loglevel=info

输出如下:

[2017-09-10 06:59:58,665: INFO/MainProcess] Connected to redis://localhost:6379/0
[2017-09-10 06:59:58,671: INFO/MainProcess] mingle: searching for neighbors
[2017-09-10 06:59:59,688: INFO/MainProcess] mingle: all alone
[2017-09-10 06:59:59,724: INFO/MainProcess] celery@ubuntu ready.

设置Broker

$ sudo rabbitmqctl add_user myuser mypassword

$ sudo rabbitmqctl add_vhost myvhost

$ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

Django使用celery

使用的是django-celery包:

pip install django-djcelery

图片 9

发送任务

进入python环境:

>>> from tasks import add
>>> add.delay(4, 4)

worker里就可以看到任务处理的消息:

[2017-09-10 07:02:34,874: INFO/MainProcess] Received task: task.add[40ec89c8-0a23-4a26-9da0-7f418c50f4cb]  
[2017-09-10 07:02:34,876: INFO/ForkPoolWorker-1] Task task.add[40ec89c8-0a23-4a26-9da0-7f418c50f4cb] succeeded in 0.000579041981837s: 8

应用

首先你要创建Celery实例。

from celery import Celery

BROKER_URL = 'amqp://guest:[email protected]:5672//'

app = Celery('tasks', broker=BROKER_URL)

@app.task
def add(x, y):
    return x + y

配置djcelery

下面是djcelery的有关配置,定义在Django项目的settings模块内。
首先INSTALLED_APPS之中需要写入djcelery。

图片 10

图片 11

说明:直接使用django做broker生产环境不建议,建议使用redis或者rabbitMQ
需要在settings.py之中载入celery的配置。

图片 12

配置好了celery之后需要创建Celery所需要的数据表(django1.7)

python manage.py migrate

存储结果

使用Redis作为存储backend,在tasks.py中修改:

app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')

运行后就可以查看结果了:

>>> from task import add
>>> r=add.delay(3,4)
>>> r.result
7
>>> r.ready()
True
>>> r.get(timeout=1)
7

可以查看Redis中的存储:

127.0.0.1:6379> get celery-task-meta-f2032d3e-f9a0-425d-bce4-f55ce58c8706
"{"status": "SUCCESS", "traceback": null, "result": 7, "task_id": "f2032d3e-f9a0-425d-bce4-f55ce58c8706", "children": []}"
127.0.0.1:6379> 

超级简单,想要对Celery做更进一步的了解,请参考官方文档。

执行celery worker服务器

$ celery -A tasks worker --loglevel=info

通过使用supervisord守护进程执行

消息中间件

        Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

调用task

>>> from tasks import add
>>> add.delay(4, 4)

任务执行单元

        Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

取得结果

app = Celery('tasks', backend='amqp', broker='amqp://')


>>> result = add.delay(4, 4)
>>> result.ready()
True
>>> result.get(timeout=1)
8

任务结果存储

        Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache


More

快速入门 任务队列 任务队列用于分发工作给不同线程或机器。 Celery通过消息传递 支持多个workers和brokers。提供高可用和水平扩展性。...

定义一个task

from  celery  import  Celery

app = Celery('tasks', backend='amqp://guest@localhost//', broker='amqp://guest@localhost//')

@app.task

def  add(x, y):

    return  x + y

其中,broker 就是中间件了;backend就是后端来发送状态消息,保持追踪任务的状态,存储或发送这些状态

返回顶部