How to make Celery and SqlAlchemy 2.0 friends with asynchronous Python

For the last six months, I began to think about leaving my beloved Python somewhere in the direction of Rust or Go, because, whatever one may say, it becomes painful to write in it when it comes to some more “interesting” tasks. Of course, many will argue with me, but I will continue to look at wrapping everything that blocks the GIL into various library functions. asyncio or threadingas one big crutch regarding Python’s aesthetic syntax.

Recently, I was faced with a task when a Python project needed to be dusted off and made to work a little more productively. As a result, the monolith was split into microservices, and the well-known RabbitMQ and Celery, as old as Python itself, became the broker between the services. The project was ported from Django to FastAPI, which, in my subjective opinion, is the ideal solution for any Python backends, unless we are talking about something highly loaded, where it is worth getting down from Python to another language. In general, microservices are what makes it possible to develop most of the codebase cheaply by highlighting vulnerabilities in microservices in other languages.

Let’s start with the configuration docker-compose file:

version: '3.8'

services:
  db:
    image: postgres:15.1-alpine
    env_file:
      - ./.env
    volumes:
      - postgres_data:/var/lib/postgresql/data/

  app:
    build: ./backend
    depends_on:
      - db
    env_file:
      - ./.env
    ports:
      - "8000:8000"
    volumes:
      - ./backend/src:/app/

  ...

  rabbit:
    image: rabbitmq:3.11.9-management
    hostname: rabbit
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin
      - RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit disk_free_limit 1147483648
    volumes:
      - ./rabbitmq:/var/lib/rabbitmq

  flower:
    image: mher/flower
    environment:
      - CELERY_BROKER_URL=amqp://admin:admin@rabbit:5672//
      - RESULT_BACKEND=rpc://
      - FLOWER_PORT=5555
    ports:
      - "5555:5555"
    depends_on:
      - rabbit

  worker:
    build: ./backend
    command: python -m celery -A celery_app.celery worker --loglevel=info -Q celery --logfile=celery_app/celery.log
    volumes:
      - ./backend/src:/app/
    env_file:
      - ./.env
    depends_on:
      - rabbit
    environment:
      - C_FORCE_ROOT=yes

To monitor tasks, Celery again used the familiar and painfully simple Flower. I also used as an additional argument for RabbitMQ disk_free_limit in order to stretch the maximum amount of memory allowed for messages. Focus on each Dockerfile I will not, because there is nothing specific there. Regarding the Celery configuration, there is nothing complicated either, there are a lot of manuals on the Internet. So let’s go straight to the heart of the problem, what specifically I had difficulty with.

My implementation of database connection via alchemy looks like this:

engine = create_async_engine(
    DATABASE_URL,
    echo=True
)

session: async_sessionmaker[AsyncSession] = async_sessionmaker(
    engine,
    expire_on_commit=False
)

To my dismay, nothing new and interesting appeared in Celery. In order to use an asynchronous session, you must use asynchronous functions, which means you need to wrap this function in something to celery didn’t scold.

First of all I got loop in the global scope of my file tasks.py, which stored all the tasks for Celery (I have them, if anything, only 4). It looked like this:

loop = asyncio.get_event_loop()

Also, my session needed to be wrapped in a function async_scoped_sessionto avoid errors associated with simultaneous connection to the session of several instances of applications (worker and FastAPI itself). She looked like this:

@asynccontextmanager
async def scoped_session():
    scoped_factory = async_scoped_session(
        session,
        scopefunc=current_task,
    )
    try:
        async with scoped_factory() as s:
            yield s
    finally:
        await scoped_factory.remove()

The important thing in this code is that we call the .remove() method at the end, which is necessary for the correct termination of the session, which applies only to the scoped session (in the case of a normal session, the context manager hides all the work with opening and closing). You can read more about this read in documentation. After all the operations done, we can now use our session with:

async with scoped_session() as s:
    await s.execute(...)

As for Celery, because we are unable to use async functions, then we will need to move all the asynchrony into separate functions and thereby use looplying in tasks.py. In this case, our task will look something like this

@shared_task(
    bind=True,
    name="celery:test"
)
def test_task(self, data: dict, prices: dict):
  result = loop.run_until_complete(здесь_ваша_асинхнонная_функция(и, аргументы))
  return result

After all the manipulations done, everything started up and works correctly and quickly. If someone has ideas for a better implementation – you are welcome in the comments. I also note that I do not consider my version the most correct, because there will always be a person who knows more than me or you in a particular area. I hope for objective criticism and at least some benefit from what was written. Subscribe, put likes, good luck to everyone, bye bye!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *