Apio para principiantes / Sudo Null IT News

¡Hola Habr!

Apio – Este cola de tareas distribuida asíncrona, escrito en Python, está diseñado para procesar mensajes en tiempo real mediante multitarea. Usando apio, Puedes organizar tareas para ejecutarlas en segundo plano.sin cargar el hilo principal de la aplicación.

Con Celery, puedes organizar fácilmente las tareas en segundo plano.

La instalación se realiza mediante pip.

Características clave del apio

Definiendo tareas

Creemos una instancia de Celery en un archivo. celery_app.py:

from celery import Celery

app = Celery('example', broker="your_broker_url_here")

Puedes definir tareas en segundo plano usando un decorador. @app.task. Por ejemplo, una función que simplemente suma dos números:

@app.task
def add(x, y):
    return x + y

add Es una tarea asincrónica. Puedes llamarlo usando add.delay(x, y).

Apio ofrece opciones para configurar tareas:

ignorar_resultado

Si no necesita el resultado de la tarea, hay un parámetro ignore_result:

@app.task(ignore_result=True)
def add(x, y):
    return x + y

rate_limit Limita la velocidad a la que se pueden completar las tareas. Por ejemplo, si necesita la tarea add ejecutado más de 10 veces por minuto, se puede configurar rate_limit:

@app.task(rate_limit="10/m")
def add(x, y):
    return x + y

rever

A veces la tarea puede fallar debido a problemas (que siempre suceden):

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def add(self, x, y):
    try:
        # Попытка выполнения задачи
        return x + y
    except SomeTemporaryException as exc:
        # Запланировать повторное выполнение задачи
        raise self.retry(exc=exc)

La tarea intentará ejecutarse hasta 3 veces en intervalos de 60 segundos si ocurre un error temporal.

Llamar tareas

Método delay() es la forma más sencilla de llamar a una tarea de forma asincrónica. Debajo del capó que usa apply_async():

from tasks import add

# Вызов задачи асинхронно
result = add.delay(4, 4)

apply_async() ofrece más flexibilidad al permitirle especificar varios parámetros de ejecución: tiempo de ejecución y prioridad, así como devoluciones de llamada y errores:

result = add.apply_async((4, 4), countdown=10)

add se programará para ejecutarse 10 segundos después de la llamada.

signature() crea una firma de tarea que se puede utilizar para crear trabajos complejos. procesos:

from celery import signature

sig = signature('tasks.add', args=(2, 2), immutable=True)
sig.delay()

chain() le permite conectar varias tareas en una secuencia, donde el resultado de una tarea se pasa como argumento a la siguiente:

from celery import chain

# (4 + 4) -> (8 * 10)
res = chain(add.s(4, 4), multiply.s(10))()

group() Se utiliza para ejecutar un conjunto de tareas en paralelo. Devuelve un objeto especial GroupResultque le permite rastrear la ejecución de un grupo de tareas:

from celery import group

# выполняет add(2, 2) и add(4, 4) параллельно
group_result = group(add.s(2, 2), add.s(4, 4))()

chord() es una combinación group() y chain()que le permite ejecutar un grupo de tareas en paralelo y luego llamar a una tarea de devolución de llamada con los resultados del grupo:

from celery import chord

# cначала выполняет add(2, 2) и add(4, 4) параллельно, затем результаты передаются в multiply()
result = chord((add.s(2, 2), add.s(4, 4)))(multiply.s(2))

Varios ejemplos de aplicación

Sistema de procesamiento de imágenes escalable

from celery import Celery
import PIL
from PIL import Image

app = Celery('image_processor', broker="pyamqp://guest@localhost//")

@app.task
def resize_image(image_path, output_path, size):
    with Image.open(image_path) as img:
        img.thumbnail(size, PIL.Image.ANTIALIAS)
        img.save(output_path)

@app.task
def crop_image(image_path, output_path, crop_box):
    with Image.open(image_path) as img:
        cropped_img = img.crop(crop_box)
        cropped_img.save(output_path)

# юзаем
resize_image.delay('path/to/image.jpg', 'path/to/resized_image.jpg', (800, 600))
crop_image.delay('path/to/image.jpg', 'path/to/cropped_image.jpg', (100, 100, 400, 400))

Envío de notificaciones asincrónicas

El apio se utiliza a menudo para enviar correos electrónicos, SMS o notificaciones automáticas a los usuarios de forma asincrónica. Ejemplo de envío de correos electrónicos usando SMTP:

from celery import Celery
import smtplib

app = Celery('notifications', broker="pyamqp://guest@localhost//")

@app.task
def send_email(recipient, subject, body):
    server = smtplib.SMTP('smtp.example.com', 587)
    server.starttls()
    server.login("your_email@example.com", "your_password")
    message = f"Subject: {subject}\n\n{body}"
    server.sendmail("your_email@example.com", recipient, message)
    server.quit()

# пример использования
send_email.delay("user@example.com", "Welcome!", "Thank you for registering with us.")

Ejecución asincrónica de tareas de larga duración utilizando el ejemplo de generación de informes.

from celery import Celery
import time

app = Celery('reports', broker="pyamqp://guest@localhost//")

@app.task
def generate_report(report_id, parameters):
    # операции по извлечению данных, их обработке и анализе
    time.sleep(60) # имитация длительной операции
    # сохранение или отправка отчета
    return f"Report {report_id} generated with parameters {parameters}"

# использование
generate_report.delay("report_123", {"param1": "value1", "param2": "value2"})

integración de Django

Cree un proyecto Django y agregue una aplicación:

django-admin startproject myproject
cd myproject
django-admin startapp myapp

Crear un archivo celery.py en el directorio raíz del proyecto Django (en el settings.py):

# myproject/celery.py

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace="CELERY")
app.autodiscover_tasks()

Añade las siguientes líneassettings.pypara decirle a Celery que use Redis como su intermediario de mensajes:

# myproject/settings.py

CELERY_BROKER_URL = 'redis://localhost:6379/0'

Vaya a la aplicación creada dentro del proyecto Django y cree un archivo tasks.py. Por ejemplo, creemos un problema para sumar dos números:

# myapp/tasks.py

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

Ahora puedes usar esta tarea de forma asíncrona desde modelos o cualquier otra parte de Django. Por ejemplo, agreguemos una llamada de tarea:

# myproject/celery.py

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace="CELERY")
app.autodiscover_tasks()

Para realizar tareas asincrónicas, debe iniciar el trabajador Celery:

celery -A myproject worker --loglevel=info

Esto iniciará un trabajador que escuchará y realizará tareas asincrónicas.

También puedes mejorar el seguimiento de tareas y trabajadores con flor:

celery -A myproject flower

Déjame recordarte que en dentro de los cursos online de OTUS podrás aprender los idiomas más popularesy también registrarse para una serie de eventos gratuitos.

Publicaciones Similares

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *