Using symfony / messenger and symfony / console components as independent components

In the article, I will omit the description of creating the message classes and its handler, since this moment is described in detail in the documentation.

And so let’s get started.

We configure the connection in the container using DSN, for RabbitMQ it might look like this:

AmqpConnection::class => function (ContainerInterface $container) {
    return AmqpConnection::fromDsn('amqp://guest:guest@rabbitmq:5672/%2f/messages', []);
}

for Redis like so:

RedisConnection::class => function (ContainerInterface $container) {
    return RedisConnection::fromDsn('redis://redis:6379/messages', [
        'stream' => 'messages',
        'group' => 'default',
        'consumer' => 'default',
    ]);
}

Next, we set up receivers for both connections, we will need them in the console. Note that we make them named, just by the name ampq-async or redis-async, the symfony / console messenger: consume ampq-async command will be able to detect it in the container.

'ampq-async' => function (ContainerInterface $container) {
    return new AmqpReceiver(
        $container->get(AmqpConnection::class)
    );
},

'redis-async' => function (ContainerInterface $container) {
    return new RedisReceiver(
        $container->get(RedisConnection::class)
    );
},

Next, we describe the message bus in the container, passing the SendMessageMiddleware and HandleMessageMiddleware created there.

'message-bus' => function (ContainerInterface $container) {
    $handler = new EmailMessageHandler($container);
    return new MessageBus([ 
        new SendMessageMiddleware(
            new SendersLocator([ 
                EmailMessage::class => [
                    AmqpTransport::class,
                /*    RedisTransport::class, */
                ]
            ], $container)
        ),
        new HandleMessageMiddleware(
            new HandlersLocator([
                EmailMessage::class => [$handler],
            ])
        )
    ]);
}    

We also make the bus named, the bus name will again be needed for us in the messenger: consume console command, but we’ll come back to this later when we consider the operation of the console application.

Thus, we get a fully configured message bus. Very simple, isn’t it?

That’s all, now in the controller you can send a message to the bus simply by calling:

$busName="message-bus";
$bus = $this->container->get($busName);
$bus->dispatch(
    new EmailMessage($message), [
        new SymfonyComponentMessengerStampBusNameStamp($busName) 
    ]
);

Please note that in addition to the message, we send a stamp with the bus name to the bus; without this stamp, the standard ConsumeMessagesCommand cannot be used, since by name in this stamp RoutableMessageBus finds the desired bus in the container.

Now about consuming our messages from the queue.

The symfony / console component is a very powerful and flexible tool. When using it within the framework, the application uses Symfony Bundle FrameworkBundle Console Application, passing the framework configuration to it, so the console can use all the commands of all available framework components.

But we will not taste such joy. If symfony / console is used as an independent component, the application will have to use Symfony Component Console Application as the core, and configure all commands manually.

Install the console component.

composer require symfony / console

After installation, the documentation suggests creating a console file in the bin folder with the following content.

#!/usr/bin/env php
<?php
// application.php
require __DIR__.'/vendor/autoload.php';
use SymfonyComponentConsoleApplication;
$application = new Application();
// ... register commands
$application->run();

Now let’s arrange this file according to our needs.

First of all, let’s add the definition of our container.

$containerBuilder = new ContainerBuilder();
$containerBuilder->addDefinitions(__DIR__ . '/../config/dependencies.php');
$container = $containerBuilder->build();

Next, we need a console output object

$output = new ConsoleOutput();

and of course the definitions of the required commands

$commands = [
    new ConsumeMessagesCommand(
        new RoutableMessageBus($container),
        $container,
        new EventDispatcher(),
        new ConsoleLogger($output, [])
    ),
    new StopWorkersCommand(
        new FilesystemAdapter('', 10, __DIR__ . '/../var/cache')
    )
];    

Some clarification is required here.

The constructor for the ConsumeMessagesCommand class requires a RoutableMessageBus to pass the configured container to.

In this container, he will be able to find the bus by the name ‘message-bus’, which we specified in the bus definition earlier and passed in the message stamp.

You also need to pass the container itself, EventDispatcher and ConsoleLogger with the $ output created earlier in order to

to be able to take advantage of the standard symfony console output, such as:

  • color of the output depending on the importance of the event

  • the ability to adjust the verbosity of the output using standard switches when running -v -vv -vvv.

Note that the $ output with which we created the ConsoleLogger must subsequently be passed to the call $ application-> run (null, $ output);

For the stopWorkersCommand command, you need to pass a caching adapter to be able to soft stop the workers, in order to avoid a situation when the worker has already taken a message from the queue, but has not yet had time to process it.

Full code of the console file (php-di container is used, but any psr-11 compatible can be used)
#!/usr/bin/env php
<?php

require __DIR__.'/../vendor/autoload.php';

use DIContainerBuilder;
use SymfonyComponentMessengerCommandConsumeMessagesCommand;
use SymfonyComponentMessengerCommandStopWorkersCommand;
use SymfonyComponentEventDispatcherEventDispatcher;
use SymfonyComponentCacheAdapterFilesystemAdapter;
use SymfonyComponentMessengerRoutableMessageBus;
use SymfonyComponentConsoleLoggerConsoleLogger;
use SymfonyComponentConsoleOutputConsoleOutput;
use SymfonyComponentConsoleApplication;

$containerBuilder = new ContainerBuilder();
$containerBuilder->addDefinitions(__DIR__ . '/../config/dependencies.php');
$container = $containerBuilder->build();

$output = new ConsoleOutput();

$commands = [
    new ConsumeMessagesCommand(
        new RoutableMessageBus($container),
        $container,
        new EventDispatcher(),
        new ConsoleLogger($output, [])
    ),
    new StopWorkersCommand(
        new FilesystemAdapter('', 10, __DIR__ . '/../var/cache')
    )
];    

$application = new Application('Console');
$application->addCommands($commands);
$application->run(null, $output);

Now you can run the console application like this: php bin / console messenger: consume ampq-async or for redis transport: php bin / console messenger: consume redis-async, and use the -v, -vv, or -vvv switches to control the output verbosity messages to the console, also stop the workers with the php bin / console messenger: stop-workers command.

Conclusion

After researching the messenger and console components, I appreciated the convenience of these tools.

For me, this was not only an excellent reason to understand the Symfony device, but also an article (in the good sense of the word), well, not an article, recipe or note.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *