Queuing a Laravel Job from a PostgreSQL Stored Procedure or Trigger
Why was this needed?
There is an application A , which is spinning somewhere out there, and to which I do not have access. This application uses an API implemented using stored procedures. There is an application Bwhich configures data for the application Aand also generates reports on the operation of the application A. To the application B and I have access to stored procedures.
Over time the application B overgrown with a telegram bot. Everyone liked receiving notifications in real time, and then a wish came in: when in the application A an event occurs Xnotify application users B via telegram bot.
The task would not be difficult if it could be agreed that the application A notified the application B about the occurrence of an event X.
But it was not possible to reach an agreement.
Analysis of the system showed that when an event occurs Xapplication A calls a stored procedure Y. After thinking a little, I decided to run a Job in the application B, by simply inserting a line into the jobs table. And this task will send an alert via a telegram bot.
What does it take to be successful?
The queue driver must be selected as database. Workers must be configured and running.
Implementation
It’s better to see and touch once than to read the theory a hundred times, so I describe the implementation in the form of a tutorial for a test application.
If you think this is unnecessary, go straight to this section. It’s never too late to go back to the beginning.
Unfortunately, I did not find an elegant solution for the general case; the problem is solved “head-on” and for each task the solution will be unique.
But I will describe the general idea.
Our test application will send a letter and this sending will be initiated by a stored procedure.
Creating a Test Application
composer create-project laravel/laravel pgsql_job
cd pgsql_job
sudo chown -R $USER:www-data storage
sudo chown -R $USER:www-data bootstrap/cache
chmod -R 775 storage
chmod -R 775 bootstrap/cache
Set up a connection to the database. If there is no database, you can run
docker container
docker run -d -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test \
-p 5432:5432 --name pgsql postgres
After that, contact the database. Settings in .env
DB_CONNECTION=pgsql
DB_HOST=127.0.0.1
DB_PORT=5432
DB_DATABASE=test
DB_USERNAME=test
DB_PASSWORD=test
Specify the queue driver as database (QUEUE_CONNECTION=database).
We will conduct experiments on sending letters. To avoid setting up the connection, set MAIL_MAILER=log. Let the letters fall into the logs.
Creating a notification
make:php artisan make:notification TestNotify
Let’s go to app/Notifications/TestNotify.php and change the constructor
public function __construct(public $subject, public $id)
{
//
}
and method toMail()
public function toMail(object $notifiable): MailMessage
{
return (new MailMessage)
->subject($this->subject)
->line('ID: ' . $this->id);
}
The result should be
<?php
namespace App\Notifications;
use Illuminate\Bus\Queueable;
use Illuminate\Notifications\Messages\MailMessage;
use Illuminate\Notifications\Notification;
class TestNotify extends Notification
{
use Queueable;
public function __construct(public $subject, public $id)
{
//
}
public function via(object $notifiable): array
{
return ['mail'];
}
public function toMail(object $notifiable): MailMessage
{
return (new MailMessage)
->subject($this->subject)
->line('ID: ' . $this->id);
}
public function toArray(object $notifiable): array
{
return [
//
];
}
}
Let’s check if our notification works.
Let’s go to tinker
php artisan tinker
After the invitation appears, enter the code
use App\Notifications\TestNotify;
Notification::route('mail', 'test@mail.ru')->notify(new TestNotify('Test Subject', 123456));
Let’s look at the log
Great! Everything is in place. And the subject of the letter, and ID and postal address. The log can be deleted so as not to search for the fact that the letter was sent again.
Create a task
php artisan make:job TestJob
Let’s go to app/Jobs/TestJob.php and change the constructor
public function __construct(public $email, public $subject, public $id)
{
//
}
and method handle()
public function handle(): void
{
Notification::route('mail', $this->email)->notify(new TestNotify($this->subject, $this->id));
}
The result should be
<?php
namespace App\Jobs;
use App\Notifications\TestNotify;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Notification;
class TestJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(public $email, public $subject, public $id)
{
//
}
public function handle(): void
{
Notification::route('mail', $this->email)->notify(new TestNotify($this->subject, $this->id));
}
}
Temporarily configure the queue driver to sync (QUEUE_CONNECTION=sync) and check the operation of our task in tinker. If you haven’t left it yet, get out. The launched tinker knows nothing about our new task. We go to tinker and after the invitation appears, enter
use App\Jobs\TestJob;
\Bus::dispatch(new TestJob('tset@liam.ur', 'Subject Test', 654321));
Why was the task sent so strangely? Answer is here.
Let’s look at the log
Everything is in place again. Delete the log, change the queue driver to database.
We have received a notification and the task that this notification sends. The time has come to “stuff” the sending of the task into a stored function. But first, let’s understand what a record of queuing a job should look like.
Decoding a job entry from the jobs table
Have you switched the queue driver? Exit and enter the tinker. Otherwise, the tinker will not know anything about this switch. Let’s send the task to the queue again using Tinker. In the table jobs an entry should appear.
Fields available_at And created_at equal to the result of the function execution time() at the time of inserting a record into the table. This is the number of seconds that have passed from the start of the Unix epoch (1 January 1970 00:00:00 GMT) to the current time.
Field payload contains json, which we will deal with now.
We go to Tinker and execute
json_decode(\DB::table('jobs')->first()->payload);
Execution result
> json_decode(\DB::table('jobs')->first()->payload);
= {#6275
+"uuid": "0c2c7167-804a-4bed-9991-8827571f6be8",
+"displayName": "App\Jobs\TestJob",
+"job": "Illuminate\Queue\CallQueuedHandler@call",
+"maxTries": null,
+"maxExceptions": null,
+"failOnTimeout": false,
+"backoff": null,
+"timeout": null,
+"retryUntil": null,
+"data": {#6298
+"commandName": "App\Jobs\TestJob",
+"command": "O:16:"App\Jobs\TestJob":3:{s:5:"email";s:12:"tset@liam.ur";s:7:"subject";s:12:"Subject Test";s:2:"id";i:654321;}",
},
}
Actually, creating such json in PostgreSQL is almost no problem. The only difficulty is the content of data->command, which stores the serialized object App\Jobs\TestJob
We send a command to tinker
unserialize(json_decode(\DB::table('jobs')->first()->payload)->data->command);
We get
> unserialize(json_decode(\DB::table('jobs')->first()->payload)->data->command);
= App\Jobs\TestJob {#6318
+email: "tset@liam.ur",
+subject: "Subject Test",
+id: 654321,
+job: null,
+connection: null,
+queue: null,
+chainConnection: null,
+chainQueue: null,
+chainCatchCallbacks: null,
+delay: null,
+afterCommit: null,
+middleware: [],
+chained: [],
}
The problem is that PostgreSQL knows nothing about PHP objects and the serialization algorithm. Therefore, I decided to work with this value as a string.
Now you can start working with PostgreSQL
Inserting a job record into the jobs table using PostgreSQL
To insert a job record into the jobs table we need:
generate uuid
count the number of seconds that have passed from the start of the Unix epoch (January 1, 1970 00:00:00 GMT) to the current time
generate json into which you should insert the data necessary to complete the task
Learning PostgreSQL to generate uuid
To generate uuid in PostgreSQL you need the functions contained in the module uuid-ossp. Often this module is disabled and needs to be enabled. I’ll show you how to check if the necessary functions are enabled using the example of the Docker container described above.
Connecting to a running container
docker container exec -it pgsql bash
In the container, I connect to a terminal client to work with PostgreSQL – psql
# psql -U test
Checking which functions are enabled
test=# \df
List of functions
Schema | Name | Result data type | Argument data types | Type
--------+------+------------------+---------------------+------
(0 rows)
It can be seen that there are no functions for working with uuid. Let’s try to add them.
test=# CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION
test-# \df
List of functions
Schema | Name | Result data type | Argument data types | Type
--------+--------------------+------------------+---------------------------+------
public | uuid_generate_v1 | uuid | | func
public | uuid_generate_v1mc | uuid | | func
public | uuid_generate_v3 | uuid | namespace uuid, name text | func
public | uuid_generate_v4 | uuid | | func
public | uuid_generate_v5 | uuid | namespace uuid, name text | func
public | uuid_nil | uuid | | func
public | uuid_ns_dns | uuid | | func
public | uuid_ns_oid | uuid | | func
public | uuid_ns_url | uuid | | func
public | uuid_ns_x500 | uuid | | func
(10 rows)
We need the uuid_generate_v4 function. Let’s try to use it.
test=# select uuid_generate_v4();
uuid_generate_v4
--------------------------------------
8a7a8be0-e4cd-4c0a-8275-dcf2ed58c5a9
(1 row)
Great, it works!
If in your case the functions have not been added, please contact PostgreSQL documentation.
In the past, this module depended on the OSSP UUID library, which was reflected in its name. Although the OSSP UUID library can still be found at http://www.ossp.org/pkg/lib/uuid/, it is poorly supported and increasingly difficult to port to new platforms. Therefore the module
uuid-ossp
It is now possible to build without the OSSP library on some platforms. On FreeBSD and some other BSD-based OSes, suitable UUID generation functions are included in the system librarylibc
. On Linux, macOS and some other platforms, suitable functions are provided by the librarylibuuid
which originally came from the projecte2fsprogs
(although on modern Linux distributions it is part of the packageutil-linux-ng
). Callingconfigure
pass the key--with-uuid=bsd
to use BSD features, or--with-uuid=e2fs
to uselibuuid
frome2fsprogs
or key--with-uuid=ossp
to use the OSSP UUID library. Several libraries may be installed on a given system at once, soconfigure
does not select the library automatically.
Teaching PostgreSQL to count seconds
PostgreSQL does not have a function similar to the time() function in PHP. Therefore, you can get the required number of seconds using this design
test=# select extract(epoch from current_timestamp)::integer;
extract
------------
1697631498
(1 row)
Collecting json
The simplest thing is to take a ready-made record of a valid task as a basis. We take as a basis the result of the command sent to the tinker: json_decode(\DB::table(‘jobs’)->first()->payload) and based on the result obtained, we collect json.
select json_build_object(
'uuid', uuid_generate_v4(),
'displayName', 'App\Jobs\TestJob',
'job', 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries', null,
'maxExceptions', null,
'failOnTimeout', false,
'backoff', null,
'timeout', null,
'retryUntil', null,
'data', (json_build_object('commandName', 'App\Jobs\TestJob',
'command', 'O:16:"App\Jobs\TestJob":3:{s:5:"email";s:12:"tset@liam.ur";s:7:"subject";s:12:"Subject Test";s:2:"id";i:654321;}'))
);
Note that instead of passing the known uuid, a new one is generated.
This request returns a valid payload for the task, but we need to not only queue the task, but pass data to it.
Let’s look at the command field. I pass this parameter as a string. The serialized string carries the data type and its dimension, if required. We pass three parameters to the task: two string parameters – email and subject and one integer. Which is displayed in the body of the letter. Let’s find these parameters in the line.
email: s:12:”tset@liam.ur”. s — string, 12 — length of string, “tset@liam.ur” — contents of the string, escaped with double quotes.
Likewise for subject.
In the case of id: i:654321 i is an integer, 654321 is its value.
Thus, it is necessary to replace the following parameters: dimension and content.
It should be remembered that in PostgreSQL, calculating the length of strings encoded in utf-8 is exactly the same problem as in PHP. Therefore, the length of string variables should be calculated using the octet_length() function;
PostgreSQL function
Creating a migration
php artisan make:migration create_set_job_function
Migration Contents
<?php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
return new class extends Migration
{
public function up(): void
{
$sql = <<<HERE
create function set_job(_email character varying, _subject character varying, _id integer) returns void
language plpgsql
as
$$
declare
_time int;
_payload json;
_command varchar;
begin
_time := extract(epoch from current_timestamp)::integer;
_command = 'O:16:"App\Jobs\TestJob":3:{s:5:"email";s:' ||
octet_length(_email) || ':"' ||
_email ||'";s:7:"subject";s:'||
octet_length(_subject) ||':"' ||
_subject || '";s:2:"id";i:' || _id::varchar || ';}';
_payload = json_build_object(
'uuid', uuid_generate_v4(),
'displayName', 'App\Jobs\TestJob',
'job', 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries', null,
'maxExceptions', null,
'failOnTimeout', false,
'backoff', null,
'timeout', null,
'retryUntil', null,
'data', (json_build_object('commandName', 'App\Jobs\TestJob',
'command', _command)));
insert into jobs(queue, payload, attempts, reserved_at, available_at, created_at)
values ('default', _payload, 0, null, _time, _time);
end;
$$;
alter function set_job(varchar, varchar, integer) owner to test;
HERE;
DB::unprepared($sql);
}
public function down(): void
{
DB::unprepared('drop function if exists set_job');
}
};
Performing migration
php artisan migrate
Go to the container terminal and try to call this function
test=# select set_job('e@mail.ru', 'Кирилица в заголовке', 09876);
set_job
---------
(1 row)
Launch the worker in the terminal
php artisan queue:work --once
And look at the log
Oops. Almost everything is correct. Homework for readers: find the error and correct the code of the stored function so that it works correctly for any set of data.
Epilogue
In this simple way you can make Laravel react to events occurring in the database: stored functions and triggers.