Introducción a Celery y las colas de tareas asíncronas

Preview:

DESCRIPTION

Introducción al framework Celery y a las colas de tareas asíncronas en Python. Código fuente de los ejemplos disponible aquí: https://github.com/albertoalcolea/charla-celery

Citation preview

CeleryDistributed Task Queue

Introducción a Celery y las colas de tareas asíncronas

Alberto AlcoleaAlberto Alcolea

@albertoalcoleahttp://albertoalcolea.com

CeleryDistributed Task Queue

Recursos disponibles online

Presentación: http://slideshare.com/albertoalcolea/celery

Código fuente: https://github.com/albertoalcolea/charla-celery

¿Qué es Celery?

¡Escalar!

Advanced Message Queuing Protocol

● Protocolo de capa de aplicaciones (OSI)● Orientado a mensajes● Encolamiento de mensajes: queuing● Enrutamiento:

– Punto-a-punto– Publicación-subscripción

Casos de uso

● Tareas con alto costo computacional● I/O● Tareas periodicas: cron● Tareas lentas enviando los resultados con AJAX

¡Cualquier cosa fuera del ciclo petición-respuesta!

Arquitectura

user AMQPbroker

celeryworkers

backend

Arquitectura

user

Envía a la cola:

Tareas individuales

Conjuntos de tareas

Tareas encadenadas

Tareas periódicas

Tareas “reintentables”

Arquitectura

AMQPbroker

celeryworkers

Workers escuchan (o se

subscriben) al bus AMQP en

busca de nuevas tareas

Arquitectura

AMQPbroker

celeryworkers

Workers ejecutan tareas en

paralelo.

Prefork (multiprocessing)

Eventlet, gevent

Threads

Arquitectura

celeryworkers

backend

Resultados escritos en

un backend

RDBMS

Redis

memcached

MongoDB

Arquitectura

celeryworkers

backend

Resultados escritos en

un backend

¡O no!

Arquitectura

user backend

Lee los resultados

Brokers soportados (v3.1)

Nombre Estado Monitorización Control remoto

RabbitMQ Estable Sí Sí

Redis Estable Sí Sí

MongoDB Experimental Sí Sí

Beanstalk Experimental No No

Amazon SQS Experimental No No

CouchDB Experimental No No

Zookeeper Experimental No No

Django ORM Experimental No No

SQLAlchemy Experimental No No

IronMQ Plugin de terceros No No

Un poco de código

from celery import Celery

app = Celery('tasks', broker='amqp://')

@app.taskdef add(x, y): return x + y

Un poco de código

from celery import Celery

app = Celery('tasks', broker='redis://')

@app.taskdef add(x, y): return x + y

Un poco de código

from tasks import addadd.delay(4, 4)

Un poco de código

from celery import Celery

app = Celery('tasks', broker='amqp://', backend='redis://')

@app.taskdef add(x, y): return x + y

@app.taskdef xsum(numbers): return sum(numbers)

Un poco de código

from tasks import addresult = add.delay(4, 4)

results.ready() # True/False

results.get() # results.get(timeout=1)

result.traceback

Un poco de código

@app.taskdef tarea_costosa(x, y):

try:do_something()

except Exception, exc:raise tarea_costosa.retry(exc=exc)

Si un nodo falla podemos reenviar la tarea a la cola.

Canvas

Signatures (subtasks) Tareas llamadas dentro de otras tareas.● Definir tareas parciales

● Callbacks

● Tareas inmutables

partial = add.s(2) # incomplete signaturepartial.delay(4) # 2 + 4

add.apply_async((2, 2), link=add.s(8))

Primitivas

Group Lista de tareas que deben ser aplicadas en paralelo.

ChainTareas que se ejecutan una detrás de otra (callbacks).

from celery import groupres = group(add.s(i, i) for i in xrange(10))()res.get(timeout=1)# [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

from celery import chainres = chain(add.s(2, 2), add.s(4), add.s(8))()res.get() # 16(add.s(2, 2) | add.s(4) | add.s(8))().get()

Primitivas

Chord Group con callback. Se ejecuta cuando todas terminan

ChunksDivide lista de argumentos y ejecuta en paralelo

from celery import chordres = chord((add.s(i, i) for i in xrange(10)), xsum.s())()res.get() # 90

from celery import chunksres = add.chunks(zip(range(100), range(100)), 10)()res.get()

Primitivas

Chunks + Chord/Chain = map-reduce

@app.taskdef map(x, y):

return x + y

@app.taskdef reduce(numlist):

numbers = [item for sub in numlist for item in sub]return sum(numbers)

>>> chain(add.chunks(items, 5).group(), reduce.s())().get()9900>>> chord(add.chunks(items, 2).group(), reduce.s())().get()9900

Canvas

¡Y más!

http://docs.celeryproject.org/en/latest/userguide/canvas.html

CeleryDistributed Task Queue

¡Gracias!

Alberto AlcoleaAlberto Alcolea

@albertoalcoleahttp://albertoalcolea.com