Symfony Messenger: Bundling Messages

Sometimes you want to make sure that messages in Symfony Messenger are sent to the consumer in batches rather than singly. We recently needed to send updated lines of text from our programs to a translation service provider via Messenger.

But due to the strict limitation on the data transfer rate on the part of the translation company, we cannot send messages one at a time. Therefore, it is necessary to implement the following sending algorithm: first keep all received messages intended for a given consumer, and then send all messages if the waiting time for new messages has exceeded ten seconds or if more than 100 messages are stored

Let’s show how we did it:

// Symfony Messenger Message:
class TranslationUpdate
{
    public function __construct(
        public string $locale,
        public string $key,
        public string $value,
    ) {
    }
}
class TranslationUpdateHandler implements MessageHandlerInterface
{
    private const BUFFER_TIMER = 10; // in seconds
    private const BUFFER_LIMIT = 100;
    private array $buffer = [];

    public function __construct(
        private MessageBusInterface $messageBus,
    ) {
        pcntl_async_signals(true);
        pcntl_signal(SIGALRM, Closure::fromCallable([$this, 'batchBuffer']));
    }

    public function __invoke(TranslationUpdate $message): void
    {
        $this->buffer[] = $message;

        if (count($this->buffer) >= self::BUFFER_LIMIT) {
            $this->batchBuffer();
        } else {
            pcntl_alarm(self::BUFFER_TIMER);
        }
    }

    private function batchBuffer(): void
    {
        if (0 === count($this->buffer)) {
            return;
        }

        $translationBatch = new TranslationBatch($this->buffer);
        $this->messageBus->dispatch($translationBatch);
        $this->buffer = [];

    }
}

Here we are dealing with a Messenger message that is sent every time we have an updated text for translation (the same principle can be applied to any other messages).

Our message handler will receive all messages and put them in an array buffer. If the number of elements in the buffer reaches 100, or if no new elements appear within ten seconds, the method is triggered batchBuffer

To implement a ten second timer, we use the function pcntl_alarmwhich allows you to call the method asynchronously batchBuffer as needed.

To handle system signals in our PHP code, we use the PCNTL functions (you can read more about them in PHP documentation, as well as in our blogif you speak French). We have set up a timer that will send a SIGALRM signal to the process after the specified number of seconds. Then, when the signal is received by the process, the callback function we specified as the second argument will run pcntl_signal… The callback is set for the whole application, so we can use this trick with bundling messages into batches only once

Then in the method batchBuffer we are using a new transmission in Messenger (see call dispatch) to keep track of messages in case of problems, and since the method is implemented through PCNTL, the Messenger component will not retry handling on exception.

class TranslationBatch
{
    /**
     * @param TranslationUpdate[] $notifications
     */
    public function __construct(
        private array $notifications,
    ) {
    }
}
class TranslationBatchHandler implements MessageHandlerInterface
{
    public function __invoke(TranslationBatch $message): void
    {
      // handle all our messages
    }
}

So now we have a packet handler that will always receive a list of messages to send. With it, we can easily bundle our Messenger messages into packages without having to resort to using cron.

Addition. This approach is just a proof of concept. If you want to apply it in a production environment, I recommend using a more resilient buffer storage like Redis.


The translation of the material was prepared as part of the course “Symfony Framework”… We invite everyone to a two-day online intensive “Creation of a statistics system for an online store”… On the intensive we:
– let’s begin our acquaintance with Symfony and ClickHouse (more precisely, we will build a system for collecting statistics in ClickHouse). On the basis of such a system, in the future you will be able to build and develop solutions for Business Intelligence systems and operational statistics,
– then we will deploy the API,
– and with its help, let’s look at the tools of the statistics itself.
Registration on the first day here

Similar Posts

Leave a Reply