Turning PostgreSQL events into Laravel events
Brief summary
Last time I described how you can queue a Laravel Job from a PostgreSQL stored procedure or trigger.
In this article, I will discuss how you can convert events raised in PostgreSQL into Laravel events.
Working example posted on GitHub.
Instead of introducing
Often you need to know what is happening with the data in the database and quickly respond to it.
The easiest way to obtain information about changes is to regularly poll the database to see if there are changes that need to be responded to. And developers often do just that. This is not the most efficient method because it can cause unnecessary load on the server and requires modifying existing tables or creating new ones.
But PostgreSQL can notify clients through a mechanism called “LISTEN/NOTIFY”. Documentation is located Here And Here.
This mechanism has disadvantages:
Requires a long-term open connection to the database, which may not be practical.
The notification system does not guarantee delivery or ordering of messages and should not be used as a full-featured message queue. “LISTEN/NOTIFY” should only be used for light communication between processes.
The message size is limited by the row size (8192 bytes in PostgreSQL 13).
For a deeper understanding of this mechanism, you can refer to this material.
I’ll just show you how to create an Artisan command somewhat similar to the queue:work command, which will do its job and better in conjunction with Supervisor.
“Testing” the technology
To get you started quickly, I'll create a docker container:
docker run -d -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test \
-p 5433:5432 --name pgsql postgres
Please note that I use port 5433 because I have a “stationary” DBMS running on my home port.
Now we launch three terminals by issuing the command in all of them:
docker exec -it pgsql psql -U test
In the first and second terminals we issue the command:
LISTEN my_event;
This command “subscribes” the client to notifications from the PostgreSQL server through a channel called my_event. If any other client issues the NOTIFY my_event command, then all clients that issued the LISTEN my_event command will receive a notification.
In the third terminal we issue the command:
NOTIFY my_event, 'Hello, PostgreSQL!';
Then in the first and second terminals we issue the command again:
LISTEN my_event;
Here I have some misunderstanding. I thought that the first and second terminals should receive the notification automatically, but for some reason they require calling “LISTEN my_event” again.
You can play around and make sure that it is impossible to receive a notification without subscribing, as well as reading it twice.
The first two terminals can be closed. The third one can be left for experiments.
Creating a Laravel Application
composer create-project laravel/laravel listen_notify
cd listen_notify
sudo chown -R $USER:www-data storage
sudo chown -R $USER:www-data bootstrap/cache
chmod -R 775 storage
chmod -R 775 bootstrap/cache
Database connection settings in .env
DB_CONNECTION=pgsql
DB_HOST=127.0.0.1
DB_PORT=5433
DB_DATABASE=test
DB_USERNAME=test
DB_PASSWORD=test
Remove all existing migrations. We won't need them.
Now let's create templates for the artisan command, event and listener that listens to this event
php atrisan make:command ListenNotifyCommand
php artisan make:event PostgresNotificationReceived
php artisan make:listener LogPostgresNotification --event=PostgresNotificationReceived
Let's configure the EventServiceProvider and add the following value to the $listen array:
protected $listen = [
Registered::class => [
SendEmailVerificationNotification::class,
],
// Добавленное значение
PostgresNotificationReceived::class => [
LogPostgresNotification::class,
]
];
The application framework is ready
Creating a Laravel Event Listener
I won't come up with complicated logic. I’ll just write the received data to a log file
Contents of the file app/Listeners/LogPostgresNotification.php
<?php
namespace App\Listeners;
use App\Events\PostgresNotificationReceived;
use Illuminate\Support\Facades\Log;
class LogPostgresNotification
{
public function handle(PostgresNotificationReceived $event): void
{
Log::info('Received Postgres notification: ', $event->notification);
}
}
Creating a Laravel Event
I won’t philosophize either, I’ll just pass the payload through the constructor to the event
Contents of the file app/Events/PostgresNotificationReceived.php
<?php
namespace App\Events;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class PostgresNotificationReceived
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public function __construct(public array $notification)
{
}
}
Writing the logic of the listen:notify command
Let's go from simple to complex. To begin with, we will simply receive a message from PostgreSQL.
To do this, we modify the handle method, not forgetting to modify the $signature and $description fields
Contents of the file app/Console/Commands/ListenNotifyCommand.php
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PDO;
class ListenNotifyCommand extends Command
{
protected $signature="listen:notify";
protected $description = 'Listen to PostgreSQL notify events';
public function handle(): void
{
$pdo = DB::connection()->getPdo();
// Listen to the 'my_channel' notifications
$pdo->exec("LISTEN my_event");
$this->info('Starting');
// Forever loop
while (true) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
}
}
}
}
Let's check if our team is working. Run the command in the terminal: php artisan listen:notify and in the third terminal (we didn’t close it) we serve again NOTIFY my_event, 'Hello, PostgreSQL!';
A start. A Laravel application received an event from PostgreSQL.
When changing the command code, do not forget to restart the process.
Adding signal processing
A little about signals, gurus may miss
Signals are part of the POSIX standards and are used to asynchronously notify a process about an event in Unix and similar operating systems, for example, Linux. Applications on these systems can process incoming signals, for example, to stop the process (SIGTERM, SIGINT), reboot (SIGHUP), etc.
Windows does not support POSIX signals. It uses native mechanisms to manage processes and threads, including Windows API functions for sending and handling control signals such as Ctrl+C.
However, there are some environments on Windows, such as the Windows Subsystem for Linux (WSL), that provide compatibility with POSIX standards and support POSIX signals.
In the context of PHP and the command line, we are interested in the following signals:
SIGINT (interrupt signal). This signal is usually sent when pressing Ctrl+C in a terminal. It tells the process to stop.
SIGTERM (end execution). This is the standard signal to stop a process in Unix. Programs can intercept this signal and do any necessary work before terminating. If the program does not catch SIGTERM, it will exit immediately.
SIGKILL (kill the process immediately). This signal cannot be intercepted or ignored. When a process receives a SIGKILL, it stops immediately.
We add two fields ($hasPcntl, $running) of bool type to the ListenNotifyCommand class and initialize them. We write a method – a signal handler
protected bool $hasPcntl = false;
protected bool $running = true;
private function handleSignal(int $signal): void
{
switch ($signal) {
case SIGINT:
case SIGTERM:
$this->info( PHP_EOL . 'Received stop signal, shutting down...');
$this->running = false;
break;
default:
}
}
Extension required for signal processing pcntl. This extension is not available for Windows, however, it is quite possible to write a cross-platform solution.
We are finalizing the handle method
public function handle(): int
{
// Проверка, что модуль pcntl подключён
$this->hasPcntl = extension_loaded('pcntl');
if ($this->hasPcntl) {
// Если модуль pcntl подключён, назначаем обработчики сигналов
pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
}
$pdo = DB::connection()->getPdo();
$pdo->exec("LISTEN my_event");
$this->info('Start listening');
while ($this->running) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
$this->info('iter');
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
}
if ($this->hasPcntl) {
// Если модуль pcntl подключён, вызываем обработчики сигналов
pcntl_signal_dispatch();
}
}
// Возвращаем 0, как код завершения
return 0;
}
File app/Console/Commands/ListenNotifyCommand.php
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PDO;
class ListenNotifyCommand extends Command
{
protected $signature="listen:notify";
protected $description = 'Listen to PostgreSQL notify events';
protected bool $hasPcntl = false;
protected bool $running = true;
public function handle(): int
{
$this->hasPcntl = extension_loaded('pcntl');
if ($this->hasPcntl) {
pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
}
$pdo = DB::connection()->getPdo();
$pdo->exec("LISTEN my_event");
$this->info('Start listening');
while ($this->running) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
$this->info('iter');
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
}
if ($this->hasPcntl) {
pcntl_signal_dispatch();
}
}
return 0;
}
private function handleSignal(int $signal): void
{
switch ($signal) {
case SIGINT:
case SIGTERM:
$this->info( PHP_EOL . 'Received stop signal, shutting down...');
$this->running = false;
break;
default:
}
}
}
You can run the command and also call NOTIFY in the adjacent console, everything should work. If the command is run on Linux and the pcntl module is connected, then when you press Ctrl+C the message will be displayed: Received stop signal, shutting down… This means that the script processes the signals correctly and stops, instead of being forced to stop.
Setting up Supervisor to monitor the script
Supervisor is a handy tool for managing background processes on Unix-like operating systems. It monitors the script, automatically restarts it if it fails, and gives you the ability to manage its state, such as starting, stopping, and rebooting.
Supervisor is also compatible with Unix signals, allowing you to customize the behavior of a process based on different signals. We configured the script to handle SIGINT and SIGTERM signals to terminate it correctly, which is consistent with Supervisor.
When Supervisor sends a SIGTERM signal to a process, it expects the process to exit and transfer control back to the system.
If the process successfully handles SIGTERM and exits gracefully, an exit code of 0 is typically returned.
If the process has not returned control within a reasonable time (default 10 seconds), Supervisor sends a SIGKILL. This time can be changed in the settings, option stopwaitsecs.
Sample Supervisor configuration file
[program:postrgres_laravel]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/your/laravel/artisan listen:notify
autostart=true
autorestart=true
user=www-data
numprocs=1
redirect_stderr=true
stdout_logfile=/var/log/postrgres_laravel.log
More details about Supervisor in the Laravel documentation.
Serializing the payload
The argument to NOTIFY is always a string. Those. if we want to convey something complex, we need to serialize it. PostgreSQL can work with JSON, let's use this skill.
Let's create a stored function that takes json as input and sends it to the my_event channel.
Migration file 2024_03_05_125805_create_send_notify_function.php
<?php
use Illuminate\Database\Migrations\Migration;
return new class extends Migration {
public function up(): void
{
DB::unprepared('
CREATE OR REPLACE FUNCTION send_notify(data json) RETURNS VOID AS $$
BEGIN
PERFORM pg_notify(\'my_event\', data::text);
END;
$$ LANGUAGE plpgsql;
');
}
public function down(): void
{
DB::unprepared('DROP FUNCTION IF EXISTS send_notify(json);');
}
};
Performing migration
php artisan migrate
Everything went well for me. Now we need to slightly modify the handle
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
$payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
$this->info('Decoded payload: ' . print_r($payload, true));
}
Here I select $payload and output it to the terminal.
Let's check if everything works for us. Run the php artisan listen:notify command. This time in the psql terminal we will submit the following construction:
select send_notify(json_build_object('key1', 'Hello, PostgreSQL!'));
We look at the terminal, it works. Let's pass something less trivial:
select send_notify(json_build_object('key1', 'Hello, PostgreSQL!', 'key2', json_build_object('key2_inner', 2, 'key3_inner', 3)))
The main thing is not to get carried away and remember the 8192 byte limit.
Putting it all together
Now there is very little left. In our team, send an event so that all listeners who subscribe to it can listen to it. To do this, add just one line:
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
$payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
$this->info('Decoded payload: ' . print_r($payload, true));
// Новая строка
Event::dispatch(new PostgresNotificationReceived($payload));
}
Full listing of the file app/Console/Commands/ListenNotifyCommand.php
<?php
namespace App\Console\Commands;
use App\Events\PostgresNotificationReceived;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Event;
use PDO;
class ListenNotifyCommand extends Command
{
protected $signature="listen:notify";
protected $description = 'Listen to PostgreSQL notify events';
protected bool $hasPcntl = false;
protected bool $running = true;
public function handle(): int
{
$this->hasPcntl = extension_loaded('pcntl');
if ($this->hasPcntl) {
pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
}
$pdo = DB::connection()->getPdo();
$pdo->exec("LISTEN my_event");
$this->info('Start listening');
while ($this->running) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
$payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
$this->info('Decoded payload: ' . print_r($payload, true));
Event::dispatch(new PostgresNotificationReceived($payload));
}
if ($this->hasPcntl) {
pcntl_signal_dispatch();
}
}
return 0;
}
private function handleSignal(int $signal): void
{
switch ($signal) {
case SIGINT:
case SIGTERM:
$this->info( PHP_EOL . 'Received stop signal, shutting down...');
$this->running = false;
break;
default:
}
}
}
Let's run the command: php artisan listen:notify and issue the command in the next terminal
select send_notify(json_build_object('key1', 'Hello, PostgreSQL!', 'key2', json_build_object('key2_inner', 2, 'key3_inner', 3)));
According to our idea, the event listener writes the payload to the log. Let's look at the log.
I launch a virtual machine and check how the script processes signals in Linux
It can be seen that in response to Ctrl+C the script displays the message: Received stop signal, shutting down…
What can be improved?
Most likely, the processing of the received message should be immediately thrown, as a task, into the asynchronous Laravel queue, so that processing the message does not slow down the “endless” loop, which can lead to messages being missed or the script crashing by the Supervisor process.
Further decisions about event dispatching are performed in this task.
Conclusion
You can use the send_notify function in other PostgreSQL stored functions or triggers, passing the values of the TG_TABLE_NAME and TG_OP variables from the triggers to decide what and how to process. This article just shows the basics for getting events from PostgreSQL. How to apply them in practice depends only on the imagination of the developer.
Working application can be found on GitHub