How to make Celery and SqlAlchemy 2.0 friends with asynchronous Python
For the last six months, I began to think about leaving my beloved Python somewhere in the direction of Rust or Go, because, whatever one may say, it becomes painful to write in it when it comes to some more “interesting” tasks. Of course, many will argue with me, but I will continue to look at wrapping everything that blocks the GIL into various library functions. asyncio
or threading
as one big crutch regarding Python’s aesthetic syntax.
Recently, I was faced with a task when a Python project needed to be dusted off and made to work a little more productively. As a result, the monolith was split into microservices, and the well-known RabbitMQ and Celery, as old as Python itself, became the broker between the services. The project was ported from Django to FastAPI, which, in my subjective opinion, is the ideal solution for any Python backends, unless we are talking about something highly loaded, where it is worth getting down from Python to another language. In general, microservices are what makes it possible to develop most of the codebase cheaply by highlighting vulnerabilities in microservices in other languages.
Let’s start with the configuration docker-compose
file:
version: '3.8'
services:
db:
image: postgres:15.1-alpine
env_file:
- ./.env
volumes:
- postgres_data:/var/lib/postgresql/data/
app:
build: ./backend
depends_on:
- db
env_file:
- ./.env
ports:
- "8000:8000"
volumes:
- ./backend/src:/app/
...
rabbit:
image: rabbitmq:3.11.9-management
hostname: rabbit
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
- RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit disk_free_limit 1147483648
volumes:
- ./rabbitmq:/var/lib/rabbitmq
flower:
image: mher/flower
environment:
- CELERY_BROKER_URL=amqp://admin:admin@rabbit:5672//
- RESULT_BACKEND=rpc://
- FLOWER_PORT=5555
ports:
- "5555:5555"
depends_on:
- rabbit
worker:
build: ./backend
command: python -m celery -A celery_app.celery worker --loglevel=info -Q celery --logfile=celery_app/celery.log
volumes:
- ./backend/src:/app/
env_file:
- ./.env
depends_on:
- rabbit
environment:
- C_FORCE_ROOT=yes
To monitor tasks, Celery again used the familiar and painfully simple Flower. I also used as an additional argument for RabbitMQ disk_free_limit
in order to stretch the maximum amount of memory allowed for messages. Focus on each Dockerfile
I will not, because there is nothing specific there. Regarding the Celery configuration, there is nothing complicated either, there are a lot of manuals on the Internet. So let’s go straight to the heart of the problem, what specifically I had difficulty with.
My implementation of database connection via alchemy looks like this:
engine = create_async_engine(
DATABASE_URL,
echo=True
)
session: async_sessionmaker[AsyncSession] = async_sessionmaker(
engine,
expire_on_commit=False
)
To my dismay, nothing new and interesting appeared in Celery. In order to use an asynchronous session, you must use asynchronous functions, which means you need to wrap this function in something to celery
didn’t scold.
First of all I got loop
in the global scope of my file tasks.py
, which stored all the tasks for Celery (I have them, if anything, only 4). It looked like this:
loop = asyncio.get_event_loop()
Also, my session needed to be wrapped in a function async_scoped_session
to avoid errors associated with simultaneous connection to the session of several instances of applications (worker and FastAPI itself). She looked like this:
@asynccontextmanager
async def scoped_session():
scoped_factory = async_scoped_session(
session,
scopefunc=current_task,
)
try:
async with scoped_factory() as s:
yield s
finally:
await scoped_factory.remove()
The important thing in this code is that we call the .remove() method at the end, which is necessary for the correct termination of the session, which applies only to the scoped session (in the case of a normal session, the context manager hides all the work with opening and closing). You can read more about this read in documentation. After all the operations done, we can now use our session with:
async with scoped_session() as s:
await s.execute(...)
As for Celery, because we are unable to use async
functions, then we will need to move all the asynchrony into separate functions and thereby use loop
lying in tasks.py
. In this case, our task will look something like this
@shared_task(
bind=True,
name="celery:test"
)
def test_task(self, data: dict, prices: dict):
result = loop.run_until_complete(здесь_ваша_асинхнонная_функция(и, аргументы))
return result
After all the manipulations done, everything started up and works correctly and quickly. If someone has ideas for a better implementation – you are welcome in the comments. I also note that I do not consider my version the most correct, because there will always be a person who knows more than me or you in a particular area. I hope for objective criticism and at least some benefit from what was written. Subscribe, put likes, good luck to everyone, bye bye!