Celery + django

Celery + django
Escrito a year ago

Celery es una aplicación que nos permites realizar tareas tasks de manera asíncrona. Es decir, podremos lanzar procesos para que se ejecuten en nodos de trabajo, de manera concurrente, evitando así sobrecargar el proceso principal. Algo ideal para aplicaciones donde haya numerosas interacciones con el usuario, por ejemplo, una aplicación web. También podemos lanzar tareas síncronas y programar tareas de python para que se ejecuten de manera periódica en los nodos de trabajo.

Message brokers

Los message brokers son los programas que reciben y ejecutan las tareas de manera distribuida. Hay que recordar que Celery no es un servidor de tareas, sino una librería que se comunica con los servidores de tarea; un puente entre python y cualquier servidor de tareas. En este sentido, Celery es bastante agnóstica. Para el programador que crea las tareas, el servidor que las gestionas es irrelevante. Simplemente creamos las tareas, se las pasamos a Celery y esta se encarga de pelearse con el servidor de tareas.

Por último mencionar que Celery soporta de manera estable los siguiente servidores de tareas:

Vistazo rápido.

Para ver como instalar, configurar y utilizar Celery vamos a crear un pequeño ejemplo, que consistirá en un script de python que ejecute tareas de manera asíncrona.

Datos de las versiones

  • Celery 4.1.0

  • Python 3.6

  • Redis 4.0.2

Redis.

Lo primero que debemos hacer es instalar un servidor de tareas, en este caso yo he elegido Redis. Podéis descargarlo de esta página. Una vez instalado, abrimos un terminal y ejecutamos el siguiente comando para iniciarlo.

Código
1
$ redis-server

Para comprobar que el servidor está activo, abrimos un nuevo terminal (no cerrar el terminal donde hemos iniciado el servidor) y ejecutamos el comando:

Código
1
$ redis-cli ping

Si la respuesta que obtenemos es PONG, es que el servidor está online y listo para recibir tareas.

Instalar librerías de python.

Ahora necesitamos instalar las librerías que vamos a utilizar en nuestro ejemplo

Es recomendable primero crear un entorno de trabajo en python utilizando virtualenv. Ver esta entrada para instalar el virtualenv

Como vamos a utilizar Redis, es necesario previamente instalar esta librería.

Código
1
$ pip3 install redis

Básicamente es una librería que permite manipular y controlar Redis desde python.

Ahora instalamos Celery.

Código
1
$ pip3 install celery

Ahora ya tenemos todo listo para empezar a trabajar con Celery.

Crear los ficheros

El primer paso es crear un fichero donde se iniciará la aplicación de Celery y en donde crearemos las tareas.

tasks.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from celery import Celery

# Creamos la aplicación de Celery.
# El primer parámetro es el nombre de nuestro módulo
# El parámetro broker es la url del broker.
# En nuestro caso, la url es: redis://localhost:6379/0
app = Celery('tasks', broker='redis://localhost:6379/0')

# Creamos una tarea llamada add usando el decorador @app.task
@app.task
def sum_num(x, y):
return x + y

# Creamos una tarea llamada hello
@app.task
def hello(name):
return 'Hello %s' % name

Como veis para crear una tarea solo hay que añadir el decorador @app.task a una función de python.

Lo siguiente que tenemos que hacer, es pasar las tareas que acabamos de crear a Redis. Para ello, ejecutamos el siguiente comando en un nuevo terminal.

Código
1
$ celery -A tasks worker -l info
  • -A tasks Especificamos el módulo o archivo donde se encuentra la aplicación de Celery.

  • worker Decimos a Celery que queremos crear un worker con las tareas del módulo

  • -l info Sirve para especificar el nivel de los mensajes de información que aparecen.

El comando celery es muy potente ya posee grandes funcionalidades para informarse y manejar todos los aspectos del broker. Pero tiene limitaciones si se usa con AmazonSQS.

Ahora necesitamos crear el archivo donde ejecutaremos nuestras tareas recién creadas.

execute.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Importamos las tareas del archivo tasks.
from tasks import hello, sum_num

# Para ejecutar una función en forma de tarea lo tenemos que hacer de esta forma:
hello.delay('world')
# El parámetro de la función `hello` se pasa a través de la función`delay`

# Ejecutamos la tarea `sum_num`
sum_num.delay(3, 2)

# NOTA:
# Aunque hemos convertido la función `hello` en una tarea.
# Todavía la podemos usar como una función normal de python.
print(hello('world'))

Con esto el ejemplo ya está terminado. Ahora solo hay que ejecutar este archivo para ver los resultados.

Código
1
$ python execute.py

Empezaremos a ver como aparecen los resultados de las tareas el terminal donde tenemos iniciado el worker.

En este punto del ejemplo deberíamos tener 3 terminales abiertos:

  • El terminal con el servidor de Redis.

  • El terminal con el worker.

  • El terminal donde estamos ejecutando el archivo execute.py

Usar Celery en Django

En este punto quizás ya te hayas dado cuenta lo útil que puede ser Celery en un proyecto de Django. Pero si todavía hay alguien que se pregunta, "de que me vale ejecutar tareas asíncronas en mi proyecto web". Os voy a mostrar algunos ejemplos:

  • Procesar tareas secundarias o complejas fuera del proceso principal, evitando retrasar la respuesta que le llega al usuario.

  • Envío de emails masivos.

  • Indexar datos en un motor de búsqueda. Haystack

  • Procesar múltiples ficheros o de gran tamaño.

  • Programar tareas que se ejecuten cada cierto tiempo. Por ejemplo tareas de mantenimiento.

Estos son solo algunos ejemplos de lo que podemos hacer con Celery en una aplicación web.

Ejemplo de Celery con Django

Vamos a probar Celery + Django, creando un ejemplo simple. Básicamente será una vista que ejecute una tarea de Celery.

Lo primero es crear un proyecto con Django, en este ejemplo, el proyecto ser llamará proj. Lo siguiente es crear dentro de nuestro módulo proj (La carpeta que tiene el archivo settings.py) un archivo llamado celery.py. Aquí será donde iniciaremos Celery.

celery.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Establecer las opciones de django para la aplicación de celery.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

# Crear la aplicación de Celery
app = Celery('proj')

# Especificamos que las variables de configuración de Celery se encuentran
# en el fichero `settings.py` de Django.
# El parámetro namespace es para decir que las variables de configuración de
# Celery en el fichero settings empiezan por el prefijo *CELERY_*
app.config_from_object('django.conf:settings', namespace='CELERY')

# Este método auto-registra las tareas para el broker.
# Busca tareas dentro de todos los archivos `tasks.py` que haya en las apps
# y las envía a Redis automáticamente.
app.autodiscover_tasks()

Con esta configuración, podemos añadir la configuración de Celery en el archivo settings.py de Django. Y además gracias al método autodiscover_tasks() Celery buscará automáticamente todos los archivos tasks.py en las aplicaciones de Django y añadirá todas las tareas que encuentre en estos archivos a Redis.

El siguiente paso es modificar el archivo settings.py para añadir las opciones de configuración de Celery

settings.py
1
2
3
# Añadimos la siguiente opción
# url del broker al que se conectará celery.
CELERY_BROKER_URL='redis://localhost:6379/0'

También debemos modificar el archivo __init__.py de la carpeta proj

__init__.py
1
2
3
4
5
6
7
from __future__ import absolute_import, unicode_literals

# Esto sirve para que la aplicación de Celery se creé siempre
# que Django se inicie.
from .celery import app as celery_app

__all__ = ('celery_app',)

Creamos el fichero tasks.py dentro del módulo proj con nuestra tarea.

tasks.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from time import sleep

# El decorador shared_task sirve para crear tareas independientes a la app.
# La tarea solo es una simulación tonta.
# Pero sabed que podéis usar cualquier librería y clase aquí. Incluido el orm para acceder a la bd.
@shared_task
def simulate_send_emails(num_emails):
for i in range(1, num_emails):
print('Sending email %d' % i);
# esperamos un segundo.
sleep(1)

return 'Emails sended'

Ahora vamos a crear una vista y añadirla a las urls para probar nuestra tarea. Nada del otro mundo.

views.py
1
2
3
4
5
6
7
from django.http import HttpResponse
from proj.tasks import simulate_send_emails

# Vista que llama a la tarea.
def send_emails(request):
simulate_send_emails.delay(10)
return HttpResponse('Emails sended using Celery')
urls.py
1
2
3
4
5
6
7
8
9
from django.conf.urls import url
from django.contrib import admin
from .views import send_emails

urlpatterns = [
url(r'^admin/', admin.site.urls),
# url que ejecuta la tarea.
url(r'^send_emails$', send_emails)
]

En este punto, deberíamos tener algo así:

Árbol de directorio de nuestro proyecto de Django
1
2
3
4
5
6
7
8
9
- manage.py
- proj
|---- __init__.py
|---- settings.py
|---- celery.py
|---- tasks.py (Celery busca automáticamente, en todos los ficheros tasks.py para ver si hay tareas que añadir al worker)
|---- views.py
|---- urls.py
|---- wsgi.py

Por último ejecutamos los workers. Este comando añadirá todas las tareas que encuentre a Redis.

Código
1
$ celery -A proj worker -l info

proj es el nombre del paquete donde se encuentra la aplicación de Celery.

Con este simple ejemplo, hemos creado una vista de Django que ejecuta una tarea de Celery. Bastante sencillo, ¿verdad?

Monitorizar tareas

El problema de las tareas, es que se ejecutan en un segundo plano, no sabemos si una tarea ha terminado, está pendiente o ha fallado. A no ser que accedamos al servidor de Redis y lo comprobemos utilizando sus comandos de consola. Por suerte existen paquetes que permiten monitorizar las tareas de Celery y guardar los resultados en la base de datos de nuestra aplicación de Django. Así podremos comprobar en cada momento, que tareas se están ejecutando, y cuales han fallado.

La monitorización de tareas por parte de Celery no funciona con AmazonSQS

Uno de de esos paquetes es django-celery-monitor Ver repo. Vamos a probarlo. Primero instalar el paquete.

Código
1
$ pip install django-celery-monitor

Añadimos la nueva aplicación al archivo settings.py de nuestro proyecto

settings.py
1
2
3
INSTALED_APPS += [
'django_celery_monitor'
]

Hacemos una migración de la base de datos para añadir los modelos de la nueva app.

Código
1
$ python manage.py migrate celery_monitor

Ahora ejecutamos un comando de Celery que nos permite monitorizar los estados de las tareas y de los workers.

Código
1
$ celery -A proj events -l info --camera django_celery_monitor.camera.Camera --frequency=2.0
  • -A proj Especificamos el módulo donde se encuentra la aplicación de Celery.

  • events Indica que queremos iniciar un evento de Celery.

  • -l info Sirve para especificar el nivel de los mensajes de información que aparecen.

  • --camera Especificamos la librería encargada de gestionar los datos de la monitorización, en nuestro caso, es una clase de la librería que hemos instalado. django_celery_monitor.camera.Camera

  • --frequency=2.0 Intervalo, en segundos, en el que el programa comprueba el estado de los workers de Redis. Básicamente el evento se ejecutará cada 2 segundos para ver los estados de las tareas y de los workers.

Y por último volvemos a ejecutar los workers, pero con un pequeño cambio

Código
1
$ celery -A proj worker -l info -E

El parámetro -E Sirve para decir que el worker va a ser monitorizado.

Con todo esto ahora podemos ver los estados de las tareas que se están ejecutando, desde nuestro panel de administración de Django.

Captura de django_celery_monitor

Más cosas

Volviendo al primer ejemplo. El de las tareas hello y sum_num

Más cosas
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from tasks import hello, sum_num
from datetime import datetime, timedelta

# Esto es otra forma de ejecutar tareas de manera asíncrona.
# Los parámetros de la función se pasan en forma de lista en la función apply_async
sum_num.apply_async((4,5))

# Podemos especificar el número de segundos que debe esperar antes de ejecutarse.
sum_num.apply_async((3, 8), countdown=3)
# En este caso hemos dicho que espere 3 segundos antes de ejecutarse.

# También podemos usar fechas y horas para especificar cuando queremos lanzar la tarea
in20seconds = datetime.utcnow() + timedelta(seconds=20)
sum_num.apply_async((1,1), eta=in20seconds)

# Podemos utilizar tareas encadenadas. *pipeline*.
sum_num.apply_async((4, 4), link=sum_num.s(10))
# con el parámetro link podemos encadenar varias tareas.
# El resultado de ejecutar la primera tarea sum_num(4, 4)se enviará como primer parámetro
# a la tarea del parámetro link El resultado: (4 + 4) + 10

# Aplicar un tiempo de ejecución máximo para evitar que una tarea se quede
# indefinidamente ejecutándose por error, y acabe colapsando el servidor.
hello.apply_async(('World',), expires=30)
# La tarea se cancelará en 30 segundos si no se completa.
# También podemos utilizar fechas.

# Establecer reintentos
# Podemos reintentar ejecutar una tarea si esta falla o expira su tiempo
sum_num.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 5,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
# - max_retires: Número de intentos máximos
# - interval_start: Tiempo que pasa entre el primer y segundo intento.
# - interval_step: tiempo que se va añadiendo entre los demás intentos.
# Entre el segundo y el tercero espera 0.2 Entre el tercero y cuarto 0.4 y así sucesivamente
# - interval_max: Tiempo máximo de espera entre intentos.

Tareas periódicas

Podemos programar tareas que se ejecuten cada determinado tiempo.

tasks.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from celery import Celery
from celery.schedules import crontab

# Configuración de celery
app = Celery()

# Configuramos la zona horaria para que las tareas se ejecuten a la hora correcta.
app.conf.timezone = 'Europe/Madrid'

# Decorador para añadir configuración extra después de conectarse.
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Ejecuta la tarea test('hello') cada 10 segundos
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

# Ejecuta la tarea test('world') cada 10 segundos
sender.add_periodic_task(30.0, test.s('world'), expires=10)

# Ejecuta la tarea cada lunes a las 7:30 am
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)

@app.task
def test(arg):
print(arg)

# También podemos añadir tareas periódicas fuera de la configuración inicial.
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.test',
'schedule': 30.0,
'args': ("World",)
},
}

También existen apps para Django que gestiona desde el panel de administración tareas programadas.

Y aun más cosas.

  • Crear colas con distintas prioridades para las tareas.

  • Tareas para controlar errores de otras tareas.

  • Definir el número máximo que una tarea se puede ejecutar en un determinado tiempo.

  • Configuración para múltiples servidores de tareas.

  • Gestión del progreso y del estado de una tarea.

En conclusión Celery es una poderosa herramienta que combina muy bien con Django y con otros frameworks de python. No es de extrañar que sea tan popular y que se utilice en miles de aplicaciones web.

Comentarios