Asynchronous Jobs in Django with Celery

Translation of the article prepared in advance of the start of the course “Python Web Developer”.


If your application has any lengthy process, you can process it not in the standard request / response stream, but in the background.

For example, in your application, the user must send a thumbnail image (which, most likely, will need to be edited) and confirm the email address. If your application processes the image and then sends an email for confirmation in the request handler, then the end user will have to wait for some reason to complete both tasks before reloading or closing the page. Instead, you can transfer these operations to the task queue and leave it to a separate process for processing to immediately send a response to the user. In this case, the end user will be able to do other things on the client side while processing in the background. In this case, your application will also be able to freely respond to requests from other users and clients.

Today we’ll talk about the setup and configuration process. Celery and Redis to handle long-running processes in a Django application to solve such problems. We will also use Docker and Docker Compose to link all the pieces together and see how to test Celery jobs with unit and integration tests.

By the end of this guide, we will learn:

  • Integrate Celery in Django to create background jobs.
  • Pack Django, Celery, and Redis with Docker.
  • Run processes in the background using a separate workflow.
  • Save Celery logs to a file.
  • Tune Flower to monitor and administer Celery jobs and workers.
  • Test Celery jobs with unit and integration tests.

Background Tasks

To improve the user experience, lengthy processes should run in the background outside the normal HTTP request / response stream.

For instance:

  • Sending letters for confirmation;
  • Web scaping and crawling;
  • Data analysis;
  • Image processing;
  • Report Generation.

When creating an application, try to separate tasks that should be performed during the life of the request / response, for example, CRUD operations, from tasks that should be performed in the background.

The working process

Our goal is to develop a Django application that uses Celery to handle lengthy processes outside the request / response cycle.

  1. The end user generates a new job by sending a POST request to the server.
  2. In this view, the job is added to the queue, and the job id is sent back to the client.
  3. Using AJAX, the client continues to query the server to check the status of the job, while the job itself is running in the background.

Project creation

Clone a project from the repository django-celery and execute checkout by tag v1 in the branch master:

$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master

Since in total we need to work with three processes (Django, Redis, worker), we use Docker to simplify the work, connecting them so that we can run everything with one command in one terminal window.

From the project root, create images and launch Docker containers:

$ docker-compose up -d --build

When the build is complete, go to localhost: 1337:

Make sure the tests pass successfully:

$ docker-compose exec web python -m pytest

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item

tests/test_tasks.py .                                                                         [100%]

========================================= 1 passed in 0.47s =========================================

Let’s take a look at the structure of the project before moving on:

├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── pytest.ini
    ├── requirements.txt
    ├── static
    │   ├── bulma.min.css
    │   ├── jquery-3.4.1.min.js
    │   ├── main.css
    │   └── main.js
    ├── tasks
    │   ├── __init__.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── templates
    │   │   └── home.html
    │   └── views.py
    └── tests
        ├── __init__.py
        └── test_tasks.py

Job launch

Event handler in project/static/main.js subscribed to a button click. By clicking on the server sends an AJAX POST request with the appropriate job type: 1, 2 or 3.

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

On the server side, a view has already been configured to process the request in project/tasks/views.py:

def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        return JsonResponse({"task_type": task_type}, status=202)

And now the fun begins: we tie Celery!

Celery Setup

Let’s start by adding Celery and Redis to the file. project/requirements.txt:

celery==4.4.1
Django==3.0.4
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Celery uses message brokerRabbitmq, Redis or AWS Simple Queue Service (SQS) – to simplify communication between Celery worker and web application. Messages are sent to the broker, and then processed by the worker. After that, the results are sent to the backend.

Redis will be both a broker and a backend. Add Redis and Celery Worker to File docker-compose.yml in the following way:

version: '3.7'

services:
  web:
    build: ./project
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./project:/usr/src/app/
    ports:
      - 1337:8000
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    build: ./project
    command: celery worker --app=core --loglevel=info
    volumes:
      - ./project:/usr/src/app
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:5-alpine

pay attention to celery worker --app=core --loglevel=info:

  1. celery worker is used for launch worker Celery
  2. --app=core used to run core applications Celery (which we will shortly define);
  3. --loglevel=info defines logging level information.

Add the following to the project settings module so that Celery uses Redis as a broker and backend:

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")

Then create a file sample_tasks.py at project/tasks:

# project/tasks/sample_tasks.py

import time

from celery import shared_task

@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True

Here, using the decorator shared_task we have defined a new Celery task function called create_task.

Remember that the task itself will not be executed from the Django process, it will be performed by the Celery worker.

Now add the file celery.py at "project/core":

import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

What’s going on here?

  1. First you need to set the default value for the environment DJANGO_SETTINGS_MODULEso Celery knows how to find a Django project.
  2. Then we created a Celery instance named core and placed in a variable app.
  3. Then we loaded Celery configuration values ​​from the settings object from django.conf. We used namespace = “CELERY” to prevent collisions with other Django settings. Thus, all configuration settings for Celery must begin with a prefix CELERY_.
  4. Finally, app.autodiscover_tasks() tells Celery to search for jobs from applications defined in settings.INSTALLED_APPS.

Change project/core/__init__.pyso that the Celery application is automatically imported when starting Django:

from .celery import app as celery_app


__all__ = ("celery_app",)

Job launch

Refresh the view to start the job and send the id:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        task = create_task.delay(int(task_type))
        return JsonResponse({"task_id": task.id}, status=202)

Do not forget to import the task:

from tasks.sample_tasks import create_task

Collect images and deploy new containers:

$ docker-compose up -d --build

To start a new task, do:

$ curl -F type=0 http://localhost:1337/tasks/

You will see something like this:

{
  "task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}

Job Status

Return to the client side event handler:

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

When the response returns from the AJAX request, we will send getStatus() with job id every second:

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}/`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      
        ${res.task_id}
        ${res.task_status}
        ${res.task_result}
      `
    $('#tasks').prepend(html);

    const taskStatus = res.task_status;

    if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
    setTimeout(function() {
      getStatus(res.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err)
  });
}

If the answer is yes, then a new row will be added to the DOM table. Update view get_statusto return status:

@csrf_exempt
def get_status(request, task_id):
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return JsonResponse(result, status=200)

Import Asyncresult:

from celery.result import AsyncResult

Update containers:

$ docker-compose up -d --build

Run a new task:

$ curl -F type=1 http://localhost:1337/tasks/

Then remove task_id from the answer and call updated get_statusTo see the status:

$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/

{
    "task_id": "25278457-0957-4b0b-b1da-2600525f812f",
    "task_status": "SUCCESS",
    "task_result": true
}

You can see the same information in the browser:

Logs Celery

Update service celery at docker-compose.yml so that the Celery logs go to a separate file:

celery:
  build: ./project
  command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
  volumes:
    - ./project:/usr/src/app
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

Add a new directory to “Project” and call her “Logs”. Then add to this new directory put the file celery.log.

Update:

$ docker-compose up -d --build

You should see how the log file is locally populated after configuration volume:

[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
    /usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
    UserWarning: Using settings.DEBUG leads to a memory
    leak, never use this setting in production environments!
    warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
    Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
    Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
    succeeded in 10.027462100006233s: True

Flower Dashboard

Flower Is a lightweight web-based tool for monitoring Celery in real time. You can track running tasks, increase or decrease the pool of workers, display graphs and statistics, for example.

Add it to requirements.txt:

celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Then add the new service to docker-compose.yml:

dashboard:
  build: ./project
  command:  flower -A core --port=5555 --broker=redis://redis:6379/0
  ports:
    - 5555:5555
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis
    - celery

And test:

$ docker-compose up -d --build

Go to localhost: 5555 to view the dashboard. You should see one worker:

Run a few more tasks to test the dashboard:

Try adding more workers and see how it affects performance:

$ docker-compose up -d --build --scale celery=3

Tests

Let’s start with the simplest test:

def test_task():
    assert sample_tasks.create_task.run(1)
    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run(3)

Add the test case above to project/tests/test_tasks.py and add the following import:

from tasks import sample_tasks

Run this test:

$ docker-compose exec web python -m pytest -k "test_task and not test_home"

This test will take about a minute:

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================

It is worth noting that in assert’s above we used the method .run instead .delay to run tasks directly without using Celery worker.
Want to use stubs (mock) to speed up the process?

@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
    assert sample_tasks.create_task.run(1)
    sample_tasks.create_task.run.assert_called_once_with(1)

    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run.call_count == 2

    assert sample_tasks.create_task.run(3)
    assert sample_tasks.create_task.run.call_count == 3

Import:

from unittest.mock import patch, call

Test:

$ docker-compose exec web python -m pytest -k "test_mock_task"

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

================================== 1 passed, 2 deselected in 1.13s ==================================

See? Now much faster!

What about full integration testing?

def test_task_status(client):
    response = client.post(reverse("run_task"), {"type": 0})
    content = json.loads(response.content)
    task_id = content["task_id"]
    assert response.status_code == 202
    assert task_id

    response = client.get(reverse("get_status", args=[task_id]))
    content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
    assert response.status_code == 200

    while content["task_status"] == "PENDING":
        response = client.get(reverse("get_status", args=[task_id]))
        content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}

Remember that this test uses the same broker and backend as in development. You can create a new instance of Celery for testing:

app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)

Add import:

import json

And make sure the tests are successful.

Conclusion

Today we introduced the basic configuration of Celery to perform long-term tasks in an application on Django. You should send any processes to the processing queue that could slow down the code on the user side.

Celery can also be used to perform repetitive tasks and decompose complex resource-intensive tasks in order to distribute the computational load on several machines and reduce the execution time and the load on the machine that processes client requests.

All code you can find in this repository.


Catch up on the course

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *