Delayed Retrays by RabbitMQ

My name is Aleksey Kazakov, I’m a technical leader of the Client Communications team at DomKlik. In this article I want to share with you a “recipe” that allowed us to implement deferred retrays when using a message broker Rabbitmq

rabbit_retry

Introduction

In DomKlik, a significant part of the interaction between the services is implemented asynchronously due to the RabbitMQ message broker. A typical interaction scheme looks like this.

schema_0

  • Service A on its RabbitMQ-virtual_host (service_a_vh):
    • creates RabbitMQ-exchange (service_a_inner_exch), in which other services will publish task messages for service A,
    • creates RabbitMQ-queue (service_a_input_q), from which messages will go to service A,
    • binds service_a_input_q with service_a_inner_exch.
  • Service B, having gained access to service_a_vh, publishes messages to service_a_inner_exch that should be processed by service A.

Typically, Service A needs the results of a published task. To do this, a reverse RabbitMQ-exchange is created, into which service A publishes the results, and other services through RabbitMQ-routing_key receive only the data they need. But for our “recipe” it will not be necessary.

Great RabbitMQ tutorials can be found on their website.

Formulation of the problem

Our team delivers all kinds of SMS / push / letters to customers, and for these purposes we use third-party providers that are not within our responsibility. In general, the circuit looks like this. Service A synchronously interacts via HTTP with an external service E. Sometimes Service E may experience problems and may not respond / timeout / five hundred. If several HTTP retrays with increasing latency do not help and Service E still refuses to work correctly, then what should I do with the message?

RabbitMQ allows you to reject with requeue, which will return the task to the queue and it will not be lost. The problem is that the same task very quickly (~ 100 times per second) gets into consumer again, and so we will create an extra load on the E service (a real case from practice).

Possible solutions

1) Store the message in the application memory while continuing to retrace.

Disadvantages:

  • If consumer is single-threaded, then in this way we block the execution of other tasks from the queue, and service E may experience problems with a particular task.
  • Keeping a task in the application’s memory while retrains are going on (and this may take tens of minutes) does not seem like a good idea.

2) Using the RabbitMQ-dead_letter_exchange mechanism, save tasks until better times in a separate queue of dead tasks and read them from there by a separate consumer.

Disadvantages:

  • Manual startup of an additional consumer requires programmer intervention.
  • Automatically starting and stopping is a non-trivial task that requires extra code.

3) Save tasks in the database, from where to deliver them again to consumer after a timeout.

Disadvantages:

  • You need to write code that will do this.

Our solution

The latter option is attractive in that the same consumer will handle the tasks. It would be better to get rid of the need to work with the database, because “The best code is not written code.”

Fortunately, you can implement the deferred retrace mechanism exclusively with RabbitMQ.

First you need to find out that there are queues with timeouts in RabbitMQ: when creating a queue, you can specify the argument x-message-ttl, which determines how many milliseconds the message will exist in the queue before it is marked “dead”.

Below is a queue diagram, a description of the task route, and minimal Python code that will allow you to reproduce the diagram.

scheme_1

All elements of the scheme have already been described previously, with the exception of the path from dead_letter_queue to service_a_inner_exch. This “loop” is obtained due to the fact that for dead_letter_queue we specify service_a_inner_exch as dead letter exchange. This is the main idea. We loop the message path, sending it after a timeout from dead_letter_queue again to the original exchange.

Task Path:

  • service B posts a message to service_a_inner_exch,
  • the message is in the queue service_a_input_q,
  • service A cannot process the message and does reject,
  • the message falls into dead_letter_exchange,
  • and from there immediately to dead_letter_queue,
  • in this queue the message will spend 5 minutes and then will be marked “dead”,
  • The “dead” message falls into the dead letter exchange of the dead_letter_queue queue, and this is service_a_inner_exch.

The number of “circles” that a single task goes through can be limited by analyzing the headers that change during the dead letter exchange. This will be shown in the sample code below.

The code is written in Python 3.6.2 using the pika == 0.10.0 library.

publisher.py

import pika

import settings

def init_rmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=settings.RMQ_HOST,
        port=settings.RMQ_PORT,
        virtual_host=settings.RMQ_VHOST,
        credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
    ))
    channel = connection.channel()

    channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')

    return channel, connection

if __name__ == '__main__':
    channel, connection = init_rmq()

    channel.basic_publish(exchange=settings.RMQ_INPUT_EXCHANGE, routing_key='', body='message from rmq')
    connection.close()

consumer.py

import logging

import pika

import settings

logger = logging.getLogger(__name__)

def init_rmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=settings.RMQ_HOST,
        port=settings.RMQ_PORT,
        virtual_host=settings.RMQ_VHOST,
        credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
    ))
    channel = connection.channel()

    # создаем service_a_inner_exch
    channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')

    # создаем dead_letter_exchange
    channel.exchange_declare(exchange=settings.RMQ_DEAD_EXCHANGE, exchange_type='fanout')

    # создаем service_a_input_q
    channel.queue_declare(
        queue=settings.RMQ_INPUT_QUEUE,
        durable=True,
        arguments={
            # благодаря этому аргументу сообщения из service_a_input_q
            # при nack-е будут попадать в dead_letter_exchange
            'x-dead-letter-exchange': settings.RMQ_DEAD_EXCHANGE,
        }
    )

    # создаем очередь для "мертвых" сообщений
    channel.queue_declare(
        queue=settings.RMQ_DEAD_QUEUE,
        durable=True,
        arguments={
            # благодаря этому аргументу сообщения из service_a_input_q
            # при nack-е будут попадать в dead_letter_exchange
            'x-message-ttl': settings.RMQ_DEAD_TTL,
            # также не забываем, что у очереди "мертвых" сообщений
            # должен быть свой dead letter exchange
            'x-dead-letter-exchange': settings.RMQ_INPUT_EXCHANGE,
        }
    )
    # связываем очередь "мертвых" сообщений с dead_letter_exchange
    channel.queue_bind(
        exchange=settings.RMQ_DEAD_EXCHANGE,
        queue=settings.RMQ_DEAD_QUEUE,
    )

    # связываем основную очередь с входным exchange
    channel.queue_bind(settings.RMQ_INPUT_QUEUE, settings.RMQ_INPUT_EXCHANGE)

    return channel

def callback(ch, method, properties, body):
    logger.info('Processing message `%s`', body)
    if can_retry(properties):
        logger.warning('Retrying message')
        # requeue=False отправит сообщение не в исходную очередь, а в dead letter exchange
        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
        return

    logger.error('Can`t retry, drop message')
    ch.basic_ack(delivery_tag=method.delivery_tag)

def can_retry(properties):
    """
    Заголовок x-death проставляется при прохождении сообщения через dead letter exchange.
    С его помощью можно понять, какой "круг" совершает сообщение.
    """
    deaths = (properties.headers or {}).get('x-death')
    if not deaths:
        return True
    if deaths[0]['count'] >= settings.RETRY_COUNT:
        return False
    return True

if __name__ == '__main__':
    channel = init_rmq()

    logger.info('Consuming.')
    channel.basic_consume(
        queue=settings.RMQ_INPUT_QUEUE, consumer_callback=callback,
    )
    channel.start_consuming()

settings.py

import logging.config

RMQ_HOST = ''
RMQ_PORT = 5672
RMQ_VHOST = ''
RMQ_USERNAME = ''
RMQ_PASSWORD = ''
RMQ_INPUT_EXCHANGE = ''
RMQ_INPUT_QUEUE = ''
RMQ_DEAD_EXCHANGE = ''
RMQ_DEAD_QUEUE = ''
RMQ_DEAD_TTL = 60 * 1000  # 1 секунда
RETRY_COUNT = 2

dict_config = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'detailed': {
            'class': 'logging.Formatter',
            'format': '%(asctime)s %(levelname)s %(name)s: %(message)s'
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'level': 'INFO',
            'formatter': 'detailed',
        },
    },
    'root': {
        'level': 'INFO',
        'handlers': ['console']
    },
}

logging.config.dictConfig(dict_config)

If in settings.py you specify the necessary data for connecting to RabbitMQ, then the sequential start of consumer.py and publisher.py will produce the following log:

...
2020-05-02 12:16:32,260 INFO __main__: Consuming.
2020-05-02 12:16:35,233 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:16:35,233 WARNING __main__: Retrying message
2020-05-02 12:17:35,241 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:17:35,241 WARNING __main__: Retrying message
2020-05-02 12:18:35,249 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:18:35,250 ERROR __main__: Can`t retry, drop message
...

Those. the code will create the circuit shown in the figure, send one message to the system, try to process it three times and discard it after two retries.

Possible improvements. Different timeouts

As an extension of the functionality of the proposed scheme, we can consider creating several dead letter queues with different timeouts. After going through the dead letter exchange:

  • The routing key is saved, so you can use topics-exchange to direct messages to different dead letter queues depending on the initial routing key value,
  • headers are supplemented, so you can use headers-exchange to direct messages to different dead letter queues depending on the original message headers.

Possible improvements. Multiple Consumers

If you have several queues associated with service_a_inner_exch intended for different consumers, then the proposed scheme should be finalized. For example, you have another A_another service reading from the service_a_another_input_q queue associated with service_a_inner_exch. Then the current “loop” will send the message again in both queues, and both services will receive it again. To avoid this, you can create a separate exchange dead_inner_exch, as shown in the figure below.

scheme_2

Conclusion

The described solution allows you to implement an arbitrary number of deferred retrays with equal intervals between attempts. This requires minimal additional code, and all the logic for delay and re-delivery is performed by the RabbitMQ cluster.

This scheme has been successfully operating for about 7 months, has repeatedly saved from problems with the E service, and has never required manual intervention in its work. Operating conditions: RabbitMQ 3.6.12, 4 RPS on average, with peaks up to 40 RPS.

I hope this article helps some programmers sleep better at night.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *