Deferred execution system on RabbitMQ

4 min


Hello!

My name is Nikita, and I oversee the development of several projects at DomClick. Today I want to continue the theme of “funny pictures” in the RabbitMQ world. In his article, Alexey Kazakov considered such a powerful tool as deferred queues and different implementations of the Retry strategy. Today we’ll talk about how to use RabbitMQ to schedule periodic tasks.

Why did we need to create our own bike and why did we abandon Celery and other task management tools? The fact is that they did not fit our tasks and requirements for fault tolerance, which are quite strict in our company.

When switching to Docker and Kubernetes, many developers are faced with the problems of organizing periodic tasks, crowns are launched with a tambourine, and the control of the process leaves much to be desired. And then there are problems with peak loads during the daytime.

My task was to implement in the project a reliable system for processing periodic tasks, while easily scalable and fault-tolerant. Our project is in Python, so it was logical to see how Celery suits us. This is a good tool, but with it we have often encountered reliability, scalability and seamless release issues. One pod – one process group. When scaling Celery, you have to increase the resources of one pod, because there is no synchronization between pods, which means stopping the processing of tasks, albeit temporarily. And if the tasks are also long-term, then you already guessed how difficult it is to manage. The second obvious drawback: out of the box there is no support for asynchrony, but for us this is important, because tasks mainly contain I / O operations, and Celery runs on threads.

At that time (2018), we did not find a suitable ready-made tool, and began to develop our own. Taking as a basis the functionality of the deferred execution of tasks and the Dead Letter Exchange, we decided to create a system for processing periodic tasks. The concept looked something like this:

image

I’ll try to explain what’s what.

  1. Tasks are sent as a message to the Scheduler exchange.
  2. By routing_key fall into the desired Hatchery queue, which has the parameter message_ttl, as well as communication with the Processor exchange as a deal letter exchange. The “maturation” queue is not associated with the type of tasks, it only plays the role of a “timer”, that is, you can create as many queues as you need periods and manage through routing_key
  3. Since the queue has no listeners, the messages, after “maturing” in the queue, go to the Processor exchange.
  4. Then the free consumer (Processing consumer) picks up the message and executes. After execution, the cycle is repeated if necessary.

What is the advantage of such a scheme?

  1. Phased execution, that is, a new task will not be processed if the previous one has not completed.
  2. A single listener (consumer), that is, you can make both universal workers and specialized ones. Scaled by simply increasing the number of pods needed.
  3. Deploy new tasks without disrupting the work of the current ones. It is enough to softly update the listener pods and send the appropriate message to the queue. That is, you can raise pods with new code, which will deal with new messages, and the current processes will survive in the old pods. This gives us a seamless update.
  4. You can use asynchronous code and any infrastructure, while being stack independent.
  5. You can control the execution of tasks at the native level ack/rejectand also get an additional optional queue (control queue) that can track the lifecycle of tasks.

The circuit turned out to be actually quite simple, we quickly created a working prototype. And the code is beautiful. It is enough to mark the callback function with a simple decorator that controls the message lifecycle.

def rmq_scheduler(routing_key_for_delay_queue, routing_key_for_processing_queue):
    def decorator(func):
        @wraps(func)
        async def wrapper(channel, body, envelope, properties):
            try:
                res = await func(channel, body, envelope, properties)
                await channel.publish(
                    payload=body,
                    exchange_name="",
                    routing_key=routing_key_for_delay_queue,
                )
                await channel.basic_client_ack(envelope.delivery_tag)
                return res
            except Exception as e:
                log_error(e)
                redelivered_count = get_count_of_redelivery_attempts(properties)
                if redelivered_count <= 3:
                    await resend_msg(
                        channel=channel,
                        body=body,
                        properties=properties,
                        routing_key=routing_key_for_processing_queue)
                else:
                    async with app.natalya_db_engine.acquire() as conn:
                        async with conn.begin():
                            await channel.publish(
                                payload=body,
                                exchange_name="",
                                routing_key=routing_key_for_delay_queue,
                            )
                await channel.basic_client_ack(envelope.delivery_tag)

        return wrapper

    return decorator

Now we use this scheme to perform only periodic sequential tasks, but it can also be used when it is important to start performing a task at a certain time, without shifting the time to the execution itself. To do this, it is enough to reschedule the task after the message hits the supervisor.

True, this approach has additional overhead costs. You need to understand that in case of an error, the message will return to the queue, another worker will pick it up and immediately start executing it. Therefore, you need to separate error handling according to the degree of criticality and think in advance about what to do with the message in case of this or that error.

Possible options:

  1. The error will fix itself (for example, this is a system error): send noack and repeat the error handling.
  2. Business logic error: you need to interrupt the cycle – send ack
  3. The mistake from point 1 is repeated too often: we poison reject and signaling to the developers. There are options here. You can create a deal letter queue for the messages to be deposited in order to return the message after parsing, or you can use the retry technique (specify message_ttl).

Decorator example:

def auto_ack_or_nack(log_message):
   def decorator(func):
       @wraps(func)
       async def wrapper(channel, body, envelope, properties):
           try:
               res = await func(channel, body, envelope, properties)
               await channel.basic_client_ack(envelope.delivery_tag)
               return res
           except Exception as e:
               await channel.basic_client_nack(envelope.delivery_tag, requeue=False)
               log_error(log_message, exception=e)
 
       return wrapper
 
   return decorator

This scheme has been working for us for half a year, it is quite reliable and practically does not require attention. The application crash does not disrupt the scheduler and only slightly delays the execution of tasks.

There are no pluses without minuses. This scheme also has a critical vulnerability. If something happened to RabbitMQ and the messages disappeared, then you need to manually look at what was lost and start the loop again. But this is an extremely unlikely situation in which you will have to think about this service last 🙂

PS If the topic of scheduling periodic tasks seems interesting to you, then in the next article, I will tell you in more detail how the automation of queuing is arranged, as well as about Supervisor.

Links:


0 Comments

Leave a Reply