mechanism for reprocessing messages on errors in Symfony Messenger

Hello! My name is Vanya, for the last few years I have been doing backend development at Sravni. My team develops integrations with the services of our partners; we write code in PHP and Symfony Framework.

When working with integrations, we often deal with failures in third-party services, and the ability to recover from such errors is very important to us. Symfony has a great tool to solve this problem – you just need to use it correctly!

In this article, I will talk about how the Messenger component of Symfony has a mechanism for reprocessing messages in case of errors (or, in simple terms, the retray mechanism), and I will also share my experience of using it and some important nuances of its operation.

Who is Symfony Messenger

First, let’s remember what the Messenger component is for.

Symfony Messenger is used to exchange messages between different applications using message brokers (RabbitMQ, etc.), as well as to process messages within one application (both synchronous and asynchronous).

The concept of the Messenger component is well described in official documentation.

In this article, we will focus on only one of the functions of this component – the retray mechanism in the context of asynchronous message processing within the application.

What problems do retrays solve?

For example, let’s imagine a service where it is necessary to contact external partners via API (to apply for a loan – to a bank, for insurance – to an insurance company, and so on).

Let’s say a user has selected a loan and decides to send an application to the bank. At the time of creating the application, we will generate a message that, using Message Bus send it to the queue for subsequent asynchronous processing (in the code examples here and below – PHP 8.2 and Symfony 6.3):

Such a message has a handler, whose main task is to send the corresponding request to the partner.

With this method of interaction, there is always a risk that the external API may not be available due to network problems, application failure, technical work, or other problems.

If, having received an error from a third-party service, for example, 502 Bad Gateway, we complete processing the message and simply show the user a notification like “Failed to submit the application due to a technical error,” all parties lose – and the user, dissatisfied with the level of service, both the partner who did not receive a potential client, and us who did not meet the expectations of all of the above.

To smooth out such problems, it is logical to try to send the application one more time (or more) after a certain period of time, which may ultimately lead to success.

This is where the retray mechanism from Symfony Messenger comes to the rescue – let’s take a closer look at it.

The magic of retrays

The main link of the retray mechanism is the class SendFailedMessageForRetryListener. It listens to the WorkerMessageFailedEvent event and, when a number of conditions are met, sends the message back to the transport from which it was received. The message is marked with a special DelayStamp, which indicates the required waiting time before the next attempt – so the message goes back into the queue.

The decision to re-treat is influenced by two factors:

  1. error type – class of exception thrown in the message handler;

  2. implementation of Retry Strategy for transport of the current message.

By throwing certain exceptions inside the message handler, we can explicitly influence whether a retry will occur or not.

So, if you throw any exception in the handler that implements UnrecoverableExceptionInterfacethen we prevent retry. This makes sense when it is obvious that retrays will not automatically solve the problem.

For example, in the handler a request is sent to the API, to which the server returns, say, a response with code 400 or 401 – there is no point in sending the same request again. If the request body is not valid, or the request does not contain the headers necessary for authorization, you should first correct the errors and then resend it.

If you throw any exception that implements RecoverableExceptionInterfacethen we we demandso that the message is definitely sent for reprocessing.

This makes sense when it is obvious that the error is temporary. For example, you received a response to your request to the partner’s API 429, which means the API request limit has been exceeded. In this case, you can, for example, throw an exception like this:

Next, Retry Strategy comes into play.

If you look closely at RetryStrategyInterfaceit becomes clear that the strategy answers two questions:

  1. Do I need to send a message to the retry? The isRetryable method is called only if the error is not Recoverable or Unrecoverable, that is, the exception does not implement any of the interfaces mentioned above.

  2. How long before the next attempt? getWatingTime method – called if a retry is necessary.

The implementation of these two methods determines how long the next retry will take and whether it will happen at all.

The thing to keep in mind here is that when you configure a transport for asynchronous messages, a retray strategy is created by default, whether you like it or not!

By setting up your transport as in example from the documentationyou will get a transport that uses by default MultiplierRetryStrategy. Then, in case of an error, three more additional attempts will be made at intervals of 1, 2 and 4 seconds, respectively:

Configuration allows you to configure the maximum number of retrays and adjust their interval, and, if necessary, even specify your own strategy as service (we will consider such an example later).

A few words about where exactly the default strategy comes from in transports. I won’t go into details; Symfony DI and Service Container are a separate big topic that deserves a separate article (or better yet, several). I will only note that this is implemented in the FrameworkBundle, the default settings are declared in Configuration.phpand in the file FrameworkExtension.php The retray strategy with these settings is automatically registered in the service container. But let’s get back to retrays.

If all retrays fail (or there were none at all), the message is simply removed from the queue, that is, it is simply lost! To avoid this, you can (and most often need to) configure Failure Transport (analogue Dead Letter Queue) – it contains messages that could not be processed.

Sending a message to the corresponding failure transport (each transport can have its own failure transport configured) occurs in one more listener WorkerMessageFailedEvent events. There doesn’t seem to be anything complicated there, but if suddenly after viewing the source code you still have questions about what’s going on there, be sure to write in the comments, I’ll be happy to help you figure it out.

Sometimes there may be cases when you need to completely turn off retries, for which you just need to change the configuration by specifying max_retries: 0 in the strategy settings:

For example, if the API to which the request is sent is not idempotentin some cases even an unsuccessful request can cause some side effectsso that a subsequent re-request to the API may result in different results (a different error that is a consequence of the previous failed request).

We share the logic of retrays

Another rather important nuance that is worth examining in detail: the retray strategy applies to All messages received from this transport!

Let’s say you have one transport in your project called async, which uses the default retry strategy (just like in the most basic version from the Symfony documentation, which we have already discussed above). Through this transport you send several different messages to the queue:

Each of these messages has a different handler, and they both make requests to different APIs.

It is reasonable to assume that the logic for working with different APIs will most likely be different. But the error handling logic of different APIs can also be different, as well as the required number and intervals of retrays! And we have one general retray strategy for all transport. So what should we do?

Probably the simplest option that first comes to mind is to use different transports for messages destined for different APIs. In this case, you can easily implement a unique retray strategy for each use case and assign it to the desired transport. It’s great, isn’t it? But it’s not that simple.

Imagine that you have dozens or even hundreds of third-party services with different APIs, with their own transports and retry strategies.

This is no longer very convenient, is it? Hundreds of vehicles and strategies! There must be some alternative solution? And it is!

To avoid creating a lot of transports, it would be nice to have some kind of dynamic retray strategy that could work differently in different conditions!

For example, for messages of the BarApiMessage class, never retrace anything, and in the event of a TooManyApiRequestsException exception (we already talked about it above), always retrace it, regardless of the type (class) of the message, and use the value from the Retry HTTP header as the waiting time. After.

Unfortunately, Symfony Messenger does not have this functionality out of the box, but the good news is that it is quite easy to implement it yourself! Below is one of the possible implementation options.

First we need a ConditionalRetryStrategyInterface:

Then we implement DynamicRetryStrategy:

Now let’s configure this strategy for our async transport:

Almost all! The last thing left is to directly create as many implementations of ConditionalRetryStrategyInterface as needed for your specific scenarios.

For example, I’ll implement a conditional retray strategy to handle TooManyApiRequestsException:

That’s it now!

Features and Limitations

The example with the “dynamic” retray strategy clearly shows what flexibility the retray mechanism provides in Symfony Messenger!

You can create a default strategy, and implement specific cases as “conditional” strategies, in which you can use the message itself, the exception object, and even to implement the retray logic stamps from Envelope.

An additional bonus is the ability to easily reuse retray logic for certain errors that require the same handling regardless of the message or details of working with a specific API, which is much more difficult to achieve using many separate transports and retray strategies.

On the other hand, it should be taken into account that in order to call the ConditionalRetryStrategyInterface::supports method, in the process of traversing the tagged iterator $conditionalStrategies (and under the hood there is \Generator), instances of all conditional strategies are created, which are then stored in memory (in the service container). This is unlikely to be a real problem, but with a very large number of conditional strategies it can lead to high RAM consumption – this at least needs to be taken into account.

The most popular conditional strategies should be assigned a higher priority in order to create objects of rarely used strategies less often.

In general, a good understanding of the failed message retray mechanism and its correct use is important when the message handler sends requests to other services (in fact, in other cases too).

Thus, too frequent and prolonged use of retrays is an anti-pattern called Retry Storm. As a result of such retrays, the service receiving requests may simply not recover from a crash due to excessive load caused by a flurry of repeated requests – such situations should be avoided. I can recommend an excellent article from Yandex about what ways there are for this.

conclusions

In this article, we took a detailed look at how the retray mechanism in Symfony Messenger is structured and works, examined the basic configurations, their limitations and nuances, as well as some specific scenarios, and implemented our retray strategy.

I hope this helps you avoid surprises in production projects that use Symfony Messenger to work with message queues, and also helps you implement strategies specific to your projects.

PS Do you think it would be nice to have functionality similar to DynamicRetryStrategy in Symfony Messenger out of the box? Maybe it’s worth improving this idea and creating a PR with such a feature?

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *