Turning PostgreSQL events into Laravel events

Brief summary

Last time I described how you can queue a Laravel Job from a PostgreSQL stored procedure or trigger.

In this article, I will discuss how you can convert events raised in PostgreSQL into Laravel events.

Working example posted on GitHub.

Instead of introducing

Often you need to know what is happening with the data in the database and quickly respond to it.

The easiest way to obtain information about changes is to regularly poll the database to see if there are changes that need to be responded to. And developers often do just that. This is not the most efficient method because it can cause unnecessary load on the server and requires modifying existing tables or creating new ones.

But PostgreSQL can notify clients through a mechanism called “LISTEN/NOTIFY”. Documentation is located Here And Here.

This mechanism has disadvantages:

  • Requires a long-term open connection to the database, which may not be practical.

  • The notification system does not guarantee delivery or ordering of messages and should not be used as a full-featured message queue. “LISTEN/NOTIFY” should only be used for light communication between processes.

  • The message size is limited by the row size (8192 bytes in PostgreSQL 13).

For a deeper understanding of this mechanism, you can refer to this material.

I’ll just show you how to create an Artisan command somewhat similar to the queue:work command, which will do its job and better in conjunction with Supervisor.

“Testing” the technology

To get you started quickly, I'll create a docker container:

docker run -d -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test \
-p 5433:5432 --name pgsql postgres

Please note that I use port 5433 because I have a “stationary” DBMS running on my home port.

Now we launch three terminals by issuing the command in all of them:

docker exec -it pgsql psql -U test

In the first and second terminals we issue the command:

LISTEN my_event;

This command “subscribes” the client to notifications from the PostgreSQL server through a channel called my_event. If any other client issues the NOTIFY my_event command, then all clients that issued the LISTEN my_event command will receive a notification.

In the third terminal we issue the command:

NOTIFY my_event, 'Hello, PostgreSQL!';

Then in the first and second terminals we issue the command again:

LISTEN my_event;

Here I have some misunderstanding. I thought that the first and second terminals should receive the notification automatically, but for some reason they require calling “LISTEN my_event” again.

You can play around and make sure that it is impossible to receive a notification without subscribing, as well as reading it twice.

Works!

Works!

The first two terminals can be closed. The third one can be left for experiments.

Creating a Laravel Application

composer create-project laravel/laravel listen_notify
cd listen_notify
sudo chown -R $USER:www-data storage
sudo chown -R $USER:www-data bootstrap/cache
chmod -R 775 storage
chmod -R 775 bootstrap/cache

Database connection settings in .env

DB_CONNECTION=pgsql
DB_HOST=127.0.0.1
DB_PORT=5433
DB_DATABASE=test
DB_USERNAME=test
DB_PASSWORD=test

Remove all existing migrations. We won't need them.

Now let's create templates for the artisan command, event and listener that listens to this event

php atrisan make:command ListenNotifyCommand
php artisan make:event PostgresNotificationReceived
php artisan make:listener LogPostgresNotification --event=PostgresNotificationReceived

Let's configure the EventServiceProvider and add the following value to the $listen array:

    protected $listen = [
        Registered::class => [
            SendEmailVerificationNotification::class,
        ],
        // Добавленное значение
        PostgresNotificationReceived::class => [
            LogPostgresNotification::class,
        ]
    ];

The application framework is ready

Creating a Laravel Event Listener

I won't come up with complicated logic. I’ll just write the received data to a log file

Contents of the file app/Listeners/LogPostgresNotification.php
<?php

namespace App\Listeners;

use App\Events\PostgresNotificationReceived;
use Illuminate\Support\Facades\Log;

class LogPostgresNotification
{

    public function handle(PostgresNotificationReceived $event): void
    {
        Log::info('Received Postgres notification: ', $event->notification);
    }
}

Creating a Laravel Event

I won’t philosophize either, I’ll just pass the payload through the constructor to the event

Contents of the file app/Events/PostgresNotificationReceived.php
<?php

namespace App\Events;

use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class PostgresNotificationReceived
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public function __construct(public array $notification)
    {

    }

}

Writing the logic of the listen:notify command

Let's go from simple to complex. To begin with, we will simply receive a message from PostgreSQL.

To do this, we modify the handle method, not forgetting to modify the $signature and $description fields

Contents of the file app/Console/Commands/ListenNotifyCommand.php
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PDO;

class ListenNotifyCommand extends Command
{
    protected $signature="listen:notify";

    protected $description = 'Listen to PostgreSQL notify events';

    public function handle(): void
    {
        $pdo = DB::connection()->getPdo();

        // Listen to the 'my_channel' notifications
        $pdo->exec("LISTEN my_event");
        $this->info('Starting');
        // Forever loop
        while (true) {
            $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);

            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
            }
        }
    }
}

Let's check if our team is working. Run the command in the terminal: php artisan listen:notify and in the third terminal (we didn’t close it) we serve again NOTIFY my_event, 'Hello, PostgreSQL!';

Great!  Works!

Great! Works!

A start. A Laravel application received an event from PostgreSQL.

When changing the command code, do not forget to restart the process.

Adding signal processing

A little about signals, gurus may miss

Signals are part of the POSIX standards and are used to asynchronously notify a process about an event in Unix and similar operating systems, for example, Linux. Applications on these systems can process incoming signals, for example, to stop the process (SIGTERM, SIGINT), reboot (SIGHUP), etc.

Windows does not support POSIX signals. It uses native mechanisms to manage processes and threads, including Windows API functions for sending and handling control signals such as Ctrl+C.

However, there are some environments on Windows, such as the Windows Subsystem for Linux (WSL), that provide compatibility with POSIX standards and support POSIX signals.

In the context of PHP and the command line, we are interested in the following signals:

  • SIGINT (interrupt signal). This signal is usually sent when pressing Ctrl+C in a terminal. It tells the process to stop.

  • SIGTERM (end execution). This is the standard signal to stop a process in Unix. Programs can intercept this signal and do any necessary work before terminating. If the program does not catch SIGTERM, it will exit immediately.

  • SIGKILL (kill the process immediately). This signal cannot be intercepted or ignored. When a process receives a SIGKILL, it stops immediately.

We add two fields ($hasPcntl, $running) of bool type to the ListenNotifyCommand class and initialize them. We write a method – a signal handler

    protected bool $hasPcntl = false;
    protected bool $running = true;

    private function handleSignal(int $signal): void
    {
        switch ($signal) {
            case SIGINT:
            case SIGTERM:
                $this->info( PHP_EOL . 'Received stop signal, shutting down...');
                $this->running = false;
                break;

            default:
        }
    }

Extension required for signal processing pcntl. This extension is not available for Windows, however, it is quite possible to write a cross-platform solution.

We are finalizing the handle method

    public function handle(): int
    {
        // Проверка, что модуль pcntl подключён
        $this->hasPcntl = extension_loaded('pcntl');

        if ($this->hasPcntl) {
            // Если модуль pcntl подключён, назначаем обработчики сигналов
            pcntl_signal(SIGINT, [$this, 'handleSignal']);
            pcntl_signal(SIGTERM, [$this, 'handleSignal']);
        }

        $pdo = DB::connection()->getPdo();
        $pdo->exec("LISTEN my_event");
        $this->info('Start listening');

        while ($this->running) {
            $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
            $this->info('iter');
            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
            }

            if ($this->hasPcntl) {
                // Если модуль pcntl подключён, вызываем обработчики сигналов
                pcntl_signal_dispatch();
            }
        }
        // Возвращаем 0, как код завершения
        return 0;
    }
File app/Console/Commands/ListenNotifyCommand.php
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PDO;

class ListenNotifyCommand extends Command
{
    protected $signature="listen:notify";

    protected $description = 'Listen to PostgreSQL notify events';

    protected bool $hasPcntl = false;
    protected bool $running = true;

    public function handle(): int
    {
        $this->hasPcntl = extension_loaded('pcntl');

        if ($this->hasPcntl) {
            pcntl_signal(SIGINT, [$this, 'handleSignal']);
            pcntl_signal(SIGTERM, [$this, 'handleSignal']);
        }

        $pdo = DB::connection()->getPdo();
        $pdo->exec("LISTEN my_event");
        $this->info('Start listening');

        while ($this->running) {
            $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
            $this->info('iter');
            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
            }

            if ($this->hasPcntl) {
                pcntl_signal_dispatch();
            }
        }
        return 0;
    }

    private function handleSignal(int $signal): void
    {
        switch ($signal) {
            case SIGINT:
            case SIGTERM:
                $this->info( PHP_EOL . 'Received stop signal, shutting down...');
                $this->running = false;
                break;

            default:
        }
    }

}

You can run the command and also call NOTIFY in the adjacent console, everything should work. If the command is run on Linux and the pcntl module is connected, then when you press Ctrl+C the message will be displayed: Received stop signal, shutting down… This means that the script processes the signals correctly and stops, instead of being forced to stop.

Setting up Supervisor to monitor the script

Supervisor is a handy tool for managing background processes on Unix-like operating systems. It monitors the script, automatically restarts it if it fails, and gives you the ability to manage its state, such as starting, stopping, and rebooting.

Supervisor is also compatible with Unix signals, allowing you to customize the behavior of a process based on different signals. We configured the script to handle SIGINT and SIGTERM signals to terminate it correctly, which is consistent with Supervisor.

When Supervisor sends a SIGTERM signal to a process, it expects the process to exit and transfer control back to the system.

If the process successfully handles SIGTERM and exits gracefully, an exit code of 0 is typically returned.

If the process has not returned control within a reasonable time (default 10 seconds), Supervisor sends a SIGKILL. This time can be changed in the settings, option stopwaitsecs.

Sample Supervisor configuration file

[program:postrgres_laravel]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/your/laravel/artisan listen:notify
autostart=true
autorestart=true
user=www-data
numprocs=1
redirect_stderr=true
stdout_logfile=/var/log/postrgres_laravel.log

More details about Supervisor in the Laravel documentation.

Serializing the payload

The argument to NOTIFY is always a string. Those. if we want to convey something complex, we need to serialize it. PostgreSQL can work with JSON, let's use this skill.

Let's create a stored function that takes json as input and sends it to the my_event channel.

Migration file 2024_03_05_125805_create_send_notify_function.php
<?php

use Illuminate\Database\Migrations\Migration;

return new class extends Migration {
    public function up(): void
    {
        DB::unprepared('
            CREATE OR REPLACE FUNCTION send_notify(data json) RETURNS VOID AS $$
            BEGIN
                PERFORM pg_notify(\'my_event\', data::text);
            END;
            $$ LANGUAGE plpgsql;
        ');
    }

    public function down(): void
    {
        DB::unprepared('DROP FUNCTION IF EXISTS send_notify(json);');
    }
};

Performing migration

php artisan migrate

Everything went well for me. Now we need to slightly modify the handle

            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
                $payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
                $this->info('Decoded payload: ' . print_r($payload, true));
            }

Here I select $payload and output it to the terminal.

Let's check if everything works for us. Run the php artisan listen:notify command. This time in the psql terminal we will submit the following construction:

select send_notify(json_build_object('key1', 'Hello, PostgreSQL!'));

We look at the terminal, it works. Let's pass something less trivial:

select send_notify(json_build_object('key1', 'Hello, PostgreSQL!', 'key2', json_build_object('key2_inner', 2, 'key3_inner', 3)))
And it works again!

And it works again!

The main thing is not to get carried away and remember the 8192 byte limit.

Putting it all together

Now there is very little left. In our team, send an event so that all listeners who subscribe to it can listen to it. To do this, add just one line:

            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
                $payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
                $this->info('Decoded payload: ' . print_r($payload, true));
                // Новая строка
                Event::dispatch(new PostgresNotificationReceived($payload));
            }
Full listing of the file app/Console/Commands/ListenNotifyCommand.php
<?php

namespace App\Console\Commands;

use App\Events\PostgresNotificationReceived;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Event;
use PDO;

class ListenNotifyCommand extends Command
{
    protected $signature="listen:notify";

    protected $description = 'Listen to PostgreSQL notify events';

    protected bool $hasPcntl = false;
    protected bool $running = true;


    public function handle(): int
    {
        $this->hasPcntl = extension_loaded('pcntl');

        if ($this->hasPcntl) {
            pcntl_signal(SIGINT, [$this, 'handleSignal']);
            pcntl_signal(SIGTERM, [$this, 'handleSignal']);
        }

        $pdo = DB::connection()->getPdo();
        $pdo->exec("LISTEN my_event");
        $this->info('Start listening');

        while ($this->running) {
            $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);

            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
                $payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
                $this->info('Decoded payload: ' . print_r($payload, true));
                Event::dispatch(new PostgresNotificationReceived($payload));
            }

            if ($this->hasPcntl) {
                pcntl_signal_dispatch();
            }
        }

        return 0;

    }

    private function handleSignal(int $signal): void
    {
        switch ($signal) {
            case SIGINT:
            case SIGTERM:
                $this->info( PHP_EOL . 'Received stop signal, shutting down...');
                $this->running = false;
                break;

            default:
        }
    }

}

Let's run the command: php artisan listen:notify and issue the command in the next terminal

select send_notify(json_build_object('key1', 'Hello, PostgreSQL!', 'key2', json_build_object('key2_inner', 2, 'key3_inner', 3)));

According to our idea, the event listener writes the payload to the log. Let's look at the log.

Payload in the log

Payload in the log

I launch a virtual machine and check how the script processes signals in Linux

It can be seen that the signal was processed and the application completed normally

It can be seen that the signal was processed and the application completed normally

It can be seen that in response to Ctrl+C the script displays the message: Received stop signal, shutting down…

What can be improved?

Most likely, the processing of the received message should be immediately thrown, as a task, into the asynchronous Laravel queue, so that processing the message does not slow down the “endless” loop, which can lead to messages being missed or the script crashing by the Supervisor process.

Further decisions about event dispatching are performed in this task.

Conclusion

You can use the send_notify function in other PostgreSQL stored functions or triggers, passing the values ​​of the TG_TABLE_NAME and TG_OP variables from the triggers to decide what and how to process. This article just shows the basics for getting events from PostgreSQL. How to apply them in practice depends only on the imagination of the developer.

Working application can be found on GitHub

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *