Good replacement for Celery

In my last article “How to make Celery and SQLAlchemy 2.0 friends with asynchronous Python” I analyzed the ability to run asynchronous tasks “from under Celery” and in the comments I was informed about the existence of another library called aio_pika. And to be honest, I’ve never heard of her before. It is not surprising, the library has only around 1K stars on GitHub (compared to 20K+ for Celery). I reviewed absolutely all the popular (500+ stars) solutions and settled on this one because of the active (currently) development and relative popularity.

The stack that you will see in the article: FastAPI, RabbitMQ, aio_pika and docker. The article will be useful to those who use Celery in their projects, as well as to those who have only heard about queues and RabbitMQ.

Navigation:

  1. RabbitMQ Configuration

  2. Task router for consumer

  3. Writing a consumer

  4. Integration into the main application

Foreword

The library positions itself as “aiormq wrapper for asyncio for people”. My goal was to replace the Celery used in the project with it. I decided to do this because its interface does not involve splitting the application and workers into separate services, which I would very much like. Secondary reasons were: the lack of asynchrony, the smell of legacy (I’m talking about the self attribute, which must be written as the first argument to functions) and the lack of type hints (this is in last place of importance!). Celery in the project was used for IO-Bound and Delay tasks, so the integration of asynchrony was very helpful.

RabbitMQ Configuration

I updated my RabbitMQ by adding the plugin “RabbitMQ Delayed Message Plugin”. It was needed in order to do “postponed” tasks. Those. the task was to delete temporary files after a certain time. Celery coped with this, because. he had native support for this feature, but, as I understand it, aio-pika doesn’t have that. This plugin allows you to add this functionality to RabbitMQ itself. My docker-compose config now looks like this:

docker-compose.yaml
 rabbit:
    image: rabbitmq:3-management
    hostname: rabbit
    env_file:
      - .env
    volumes:
      - ./services/rabbit/delayed_message.ez:/opt/rabbitmq/plugins/delayed_message.ez
      - ./services/rabbit/enabled:/etc/rabbitmq/enabled_plugins
    ports:
      - "15672:15672"

Through volumes, I connected the downloaded plugin, and also added it to the list of activated ones by default. My enabled_plugins file looked like this:

[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus].

*Point at the end is required

Task router for consumer

The next step I wrote a Router for my worker, which would be convenient for me. At this point, I’m a little confused:

router.py
class Router:
    _routes: dict[str, list[str]] = {}

    def __init__(self):
        modules = list(filter(
            lambda x: x != '__init__',
            map(lambda y: y.split('.')[0], os.listdir('tasks'))
        ))
        for module in modules:
            imported = import_module(f'tasks.{module}')
            if not hasattr(imported, '__all__'):
                continue
            self._routes[module] = imported.__all__
            del imported
    def get_method(self, action: str) -> Optional[Callable]:
        module = action.split(':')[0] # Название файла
        method = action.split(':')[1] # Название функции
        if self._exists(module, method):
            return getattr(import_module(f'tasks.{module}'), method)

The _router variable is filled with tasks that are located in the tasks folder, which contains the functions (tasks) themselves. They are also listed in the variable all for export. For clarity, the task looked something like this:

async def test(is_test: bool):
    print(f'Hello world! Value is: {is_test}')

__all__ = ['test']

The next task was to solve the problem that these functions have an arbitrary number of arguments. I wrote another method for the router that could take this into account:

router.py
def check_args(func: Callable, data: dict) -> bool:
    hints = get_type_hints(func)
    for arg, arg_type in hints.items():
        if arg not in data:
            return False
        if not isinstance(data[arg], arg_type):
            return False
    return True

We pass to this method the function that we imported from the file, as well as the data that we are trying to slip into it. We also check the types specified in the function arguments. If everything is ok, then return True

Thus, I regulated the number of available tasks by creating / deleting files from the tasks folder. It turned out to be a very convenient and flexible solution.

Writing a consumer

consumer.py

async def process_message(message: AbstractIncomingMessage):
    async with message.process():
        message = MessageSchema.parse_obj(json.loads(message.body.decode()))
        method = router.get_method(message.action) # Импортируем функцию и записываем в переменную
        if method:
            if not router.check_args(method, message.body): # Проверяем атрибуты, которые собираемся передавать
                print('Invalid args')
                return
            if inspect.iscoroutinefunction(method): # Проверяем является ли функция async или нет
                await method(**message.body)
            else:
                method(**message.body)


async def main() -> None:
    queue_key = rabbit_config.RABBITMQ_QUEUE

    connection = await aio_pika.connect_robust(rabbit_config.url)
    # Для корректной работы с RabbitMQ указываем publisher_confirms=False
    channel = await connection.channel(publisher_confirms=False)
    # Кол-во задач, которые consumer может выполнять в момент времени. В моём случае 100
    await channel.set_qos(prefetch_count=100)
    queue = await channel.declare_queue(queue_key)
    
    exchange = await channel.declare_exchange(
        # Объявляем exchange с именем main и типом, который поддерживает отложенные задачи
        # Важно чтобы это имя (main) совпадало с именем на стороне publisher
        'main', ExchangeType.X_DELAYED_MESSAGE, 
        arguments={
            'x-delayed-type': 'direct'
        }
    )
    await queue.bind(exchange, queue_key)
    await queue.consume(process_message)
    try:
        await asyncio.Future()
    finally:
        await connection.close()


if __name__ == "__main__":
    asyncio.run(main())

In general, the consumer’a side is over on this and you can begin to integrate all this goodness into the main application (publisher).

Integration into the main application

OOP comes to the rescue again and I wrote a class for working with aio-pika, which completely covered my needs. Its initialization took place in the new lifespan (which will soon completely push out the old ways):

@asynccontextmanager
async def lifespan(_: FastAPI):
    await rabbit_connection.connect()
    yield
    await rabbit_connection.disconnect()

app = FastAPI(lifespan=lifespan)

The following is the implementation of this class:

rabbit_connection.py
class RabbitConnection:
    _connection: AbstractRobustConnection | None = None
    _channel: AbstractRobustChannel | None = None
    _exchange: AbstractRobustExchange | None = None

    async def disconnect(self) -> None:
        if self._channel and not self._channel.is_closed:
            await self._channel.close()
        if self._connection and not self._connection.is_closed:
            await self._connection.close()
        self._connection = None
        self._channel = None

    async def connect(self) -> None:
        try:
            self._connection = await connect_robust(rabbit_config.url)
            self._channel = await self._connection.channel(publisher_confirms=False)
            self._exchange = await self._channel.declare_exchange(
                # Повторяем из consumer'a. Важно указать одинакое
                # имя exchange'ов. В моём случае `main`
                'main', ExchangeType.X_DELAYED_MESSAGE,
                arguments={
                    'x-delayed-type': 'direct'
                }
            )
        except Exception as e:
            await self.disconnect()

    async def send_messages(
            self,
            messages: list[MessageSchema],
            *,
            routing_key: str = rabbit_config.RABBITMQ_QUEUE,
            delay: int = None # Задержка, через которое нужно выполнить задачу (в секундах)
    ) -> None:
        async with self._channel.transaction():
            headers = None
            if delay:
                headers = {
                    'x-delay': f'{delay * 1000}' # Это тоже из документации плагина для RabbitMQ
                }
            for message in messages:
                message = Message(
                    body=json.dumps(message.dict()).encode(),
                    headers=headers
                )
                await self._exchange.publish(
                    message,
                    routing_key=routing_key,
                    mandatory=False if delay else True # Чтобы в логах был порядок ;)
                )


rabbit_connection = RabbitConnection()

As a result, in order to send the work to the worker, it was enough to do the following:

main.py
@router.get('/test')
async def test():
    message = MessageSchema(
        action='images:delete',
        body={'path': 'assets/temp/temp.png'}
    )
    await rabbit_connection.send_messages(
      [message for _ in range(150)], 
      delay=20
    )
    return {'status': 'published'}

Summing up, I would like to say that the worker now feels much more confident and can do much more and faster. I hope the article was helpful. Thank you all, bye bye.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *