Queuing a Laravel Job from a PostgreSQL Stored Procedure or Trigger

Why was this needed?

There is an application A , which is spinning somewhere out there, and to which I do not have access. This application uses an API implemented using stored procedures. There is an application Bwhich configures data for the application Aand also generates reports on the operation of the application A. To the application B and I have access to stored procedures.

Over time the application B overgrown with a telegram bot. Everyone liked receiving notifications in real time, and then a wish came in: when in the application A an event occurs Xnotify application users B via telegram bot.

The task would not be difficult if it could be agreed that the application A notified the application B about the occurrence of an event X.

But it was not possible to reach an agreement.

Analysis of the system showed that when an event occurs Xapplication A calls a stored procedure Y. After thinking a little, I decided to run a Job in the application B, by simply inserting a line into the jobs table. And this task will send an alert via a telegram bot.

What does it take to be successful?

The queue driver must be selected as database. Workers must be configured and running.

Implementation

It’s better to see and touch once than to read the theory a hundred times, so I describe the implementation in the form of a tutorial for a test application.

If you think this is unnecessary, go straight to this section. It’s never too late to go back to the beginning.

Unfortunately, I did not find an elegant solution for the general case; the problem is solved “head-on” and for each task the solution will be unique.

But I will describe the general idea.

Our test application will send a letter and this sending will be initiated by a stored procedure.

Creating a Test Application

composer create-project laravel/laravel pgsql_job
cd pgsql_job
sudo chown -R $USER:www-data storage
sudo chown -R $USER:www-data bootstrap/cache
chmod -R 775 storage
chmod -R 775 bootstrap/cache

Set up a connection to the database. If there is no database, you can run

docker container
docker run -d -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test \
-p 5432:5432 --name pgsql postgres

After that, contact the database. Settings in .env

DB_CONNECTION=pgsql
DB_HOST=127.0.0.1
DB_PORT=5432
DB_DATABASE=test
DB_USERNAME=test
DB_PASSWORD=test

Specify the queue driver as database (QUEUE_CONNECTION=database).

We will conduct experiments on sending letters. To avoid setting up the connection, set MAIL_MAILER=log. Let the letters fall into the logs.

Creating a notification

make:php artisan make:notification TestNotify

Let’s go to app/Notifications/TestNotify.php and change the constructor

    public function __construct(public $subject, public $id)
    {
        //
    }

and method toMail()

    public function toMail(object $notifiable): MailMessage
    {
        return (new MailMessage)
            ->subject($this->subject)
            ->line('ID: ' . $this->id);
    }
The result should be
<?php

namespace App\Notifications;

use Illuminate\Bus\Queueable;
use Illuminate\Notifications\Messages\MailMessage;
use Illuminate\Notifications\Notification;

class TestNotify extends Notification
{
    use Queueable;

    public function __construct(public $subject, public $id)
    {
        //
    }

    public function via(object $notifiable): array
    {
        return ['mail'];
    }

    public function toMail(object $notifiable): MailMessage
    {
        return (new MailMessage)
            ->subject($this->subject)
            ->line('ID: ' . $this->id);
    }

    public function toArray(object $notifiable): array
    {
        return [
            //
        ];
    }
}

Let’s check if our notification works.

Let’s go to tinker

php artisan tinker

After the invitation appears, enter the code

 use App\Notifications\TestNotify;
 Notification::route('mail', 'test@mail.ru')->notify(new TestNotify('Test Subject', 123456));
Let’s look at the log

Great! Everything is in place. And the subject of the letter, and ID and postal address. The log can be deleted so as not to search for the fact that the letter was sent again.

Create a task

php artisan make:job TestJob

Let’s go to app/Jobs/TestJob.php and change the constructor

    public function __construct(public $email, public $subject, public $id)
    {
        //
    }

and method handle()

    public function handle(): void
    {
        Notification::route('mail', $this->email)->notify(new TestNotify($this->subject, $this->id));
    }
The result should be
<?php

namespace App\Jobs;

use App\Notifications\TestNotify;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Notification;

class TestJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct(public $email, public $subject, public $id)
    {
        //
    }

    public function handle(): void
    {
        Notification::route('mail', $this->email)->notify(new TestNotify($this->subject, $this->id));
    }
}

Temporarily configure the queue driver to sync (QUEUE_CONNECTION=sync) and check the operation of our task in tinker. If you haven’t left it yet, get out. The launched tinker knows nothing about our new task. We go to tinker and after the invitation appears, enter

use App\Jobs\TestJob;
\Bus::dispatch(new TestJob('tset@liam.ur', 'Subject Test', 654321));

Why was the task sent so strangely? Answer is here.

Let’s look at the log

Everything is in place again. Delete the log, change the queue driver to database.

We have received a notification and the task that this notification sends. The time has come to “stuff” the sending of the task into a stored function. But first, let’s understand what a record of queuing a job should look like.

Decoding a job entry from the jobs table

Have you switched the queue driver? Exit and enter the tinker. Otherwise, the tinker will not know anything about this switch. Let’s send the task to the queue again using Tinker. In the table jobs an entry should appear.

Entry from the jobs table

Entry from the jobs table

Fields available_at And created_at equal to the result of the function execution time() at the time of inserting a record into the table. This is the number of seconds that have passed from the start of the Unix epoch (1 January 1970 00:00:00 GMT) to the current time.

Field payload contains json, which we will deal with now.

We go to Tinker and execute

json_decode(\DB::table('jobs')->first()->payload);

Execution result

> json_decode(\DB::table('jobs')->first()->payload);
= {#6275
    +"uuid": "0c2c7167-804a-4bed-9991-8827571f6be8",
    +"displayName": "App\Jobs\TestJob",
    +"job": "Illuminate\Queue\CallQueuedHandler@call",
    +"maxTries": null,
    +"maxExceptions": null,
    +"failOnTimeout": false,
    +"backoff": null,
    +"timeout": null,
    +"retryUntil": null,
    +"data": {#6298
      +"commandName": "App\Jobs\TestJob",
      +"command": "O:16:"App\Jobs\TestJob":3:{s:5:"email";s:12:"tset@liam.ur";s:7:"subject";s:12:"Subject Test";s:2:"id";i:654321;}",
    },
  }

Actually, creating such json in PostgreSQL is almost no problem. The only difficulty is the content of data->command, which stores the serialized object App\Jobs\TestJob

We send a command to tinker

unserialize(json_decode(\DB::table('jobs')->first()->payload)->data->command);

We get

> unserialize(json_decode(\DB::table('jobs')->first()->payload)->data->command);
= App\Jobs\TestJob {#6318  
    +email: "tset@liam.ur",
    +subject: "Subject Test",
    +id: 654321,
    +job: null,
    +connection: null,
    +queue: null,
    +chainConnection: null,
    +chainQueue: null,
    +chainCatchCallbacks: null,
    +delay: null,
    +afterCommit: null,
    +middleware: [],
    +chained: [],
  }

The problem is that PostgreSQL knows nothing about PHP objects and the serialization algorithm. Therefore, I decided to work with this value as a string.

Now you can start working with PostgreSQL

Inserting a job record into the jobs table using PostgreSQL

To insert a job record into the jobs table we need:

Learning PostgreSQL to generate uuid

To generate uuid in PostgreSQL you need the functions contained in the module uuid-ossp. Often this module is disabled and needs to be enabled. I’ll show you how to check if the necessary functions are enabled using the example of the Docker container described above.

Connecting to a running container

docker container exec -it pgsql bash

In the container, I connect to a terminal client to work with PostgreSQL – psql

# psql -U test

Checking which functions are enabled

test=# \df
                       List of functions
 Schema | Name | Result data type | Argument data types | Type
--------+------+------------------+---------------------+------
(0 rows)

It can be seen that there are no functions for working with uuid. Let’s try to add them.

test=# CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION
test-# \df
                                 List of functions
 Schema |        Name        | Result data type |    Argument data types    | Type
--------+--------------------+------------------+---------------------------+------
 public | uuid_generate_v1   | uuid             |                           | func
 public | uuid_generate_v1mc | uuid             |                           | func
 public | uuid_generate_v3   | uuid             | namespace uuid, name text | func
 public | uuid_generate_v4   | uuid             |                           | func
 public | uuid_generate_v5   | uuid             | namespace uuid, name text | func
 public | uuid_nil           | uuid             |                           | func
 public | uuid_ns_dns        | uuid             |                           | func
 public | uuid_ns_oid        | uuid             |                           | func
 public | uuid_ns_url        | uuid             |                           | func
 public | uuid_ns_x500       | uuid             |                           | func
(10 rows)

We need the uuid_generate_v4 function. Let’s try to use it.

test=# select uuid_generate_v4();
           uuid_generate_v4
--------------------------------------
 8a7a8be0-e4cd-4c0a-8275-dcf2ed58c5a9
(1 row)

Great, it works!

If in your case the functions have not been added, please contact PostgreSQL documentation.

In the past, this module depended on the OSSP UUID library, which was reflected in its name. Although the OSSP UUID library can still be found at http://www.ossp.org/pkg/lib/uuid/, it is poorly supported and increasingly difficult to port to new platforms. Therefore the module uuid-ossp It is now possible to build without the OSSP library on some platforms. On FreeBSD and some other BSD-based OSes, suitable UUID generation functions are included in the system library libc. On Linux, macOS and some other platforms, suitable functions are provided by the library libuuidwhich originally came from the project e2fsprogs (although on modern Linux distributions it is part of the package util-linux-ng). Calling configurepass the key --with-uuid=bsdto use BSD features, or --with-uuid=e2fsto use libuuid from e2fsprogsor key --with-uuid=osspto use the OSSP UUID library. Several libraries may be installed on a given system at once, so configure does not select the library automatically.

Teaching PostgreSQL to count seconds

PostgreSQL does not have a function similar to the time() function in PHP. Therefore, you can get the required number of seconds using this design

test=# select extract(epoch from current_timestamp)::integer;
  extract
------------
 1697631498
(1 row)

Collecting json

The simplest thing is to take a ready-made record of a valid task as a basis. We take as a basis the result of the command sent to the tinker: json_decode(\DB::table(‘jobs’)->first()->payload) and based on the result obtained, we collect json.

select json_build_object(
               'uuid', uuid_generate_v4(),
               'displayName', 'App\Jobs\TestJob',
               'job', 'Illuminate\Queue\CallQueuedHandler@call',
               'maxTries', null,
               'maxExceptions', null,
               'failOnTimeout', false,
               'backoff', null,
               'timeout', null,
               'retryUntil', null,
               'data', (json_build_object('commandName', 'App\Jobs\TestJob',
                                          'command', 'O:16:"App\Jobs\TestJob":3:{s:5:"email";s:12:"tset@liam.ur";s:7:"subject";s:12:"Subject Test";s:2:"id";i:654321;}'))
       );

Note that instead of passing the known uuid, a new one is generated.

This request returns a valid payload for the task, but we need to not only queue the task, but pass data to it.

Let’s look at the command field. I pass this parameter as a string. The serialized string carries the data type and its dimension, if required. We pass three parameters to the task: two string parameters – email and subject and one integer. Which is displayed in the body of the letter. Let’s find these parameters in the line.

email: s:12:”tset@liam.ur”. s — string, 12 — length of string, “tset@liam.ur” — contents of the string, escaped with double quotes.

Likewise for subject.

In the case of id: i:654321 i is an integer, 654321 is its value.

Thus, it is necessary to replace the following parameters: dimension and content.

It should be remembered that in PostgreSQL, calculating the length of strings encoded in utf-8 is exactly the same problem as in PHP. Therefore, the length of string variables should be calculated using the octet_length() function;

PostgreSQL function

Creating a migration

php artisan make:migration create_set_job_function
Migration Contents
<?php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class extends Migration
{
    public function up(): void
    {
        $sql = <<<HERE
create function set_job(_email character varying, _subject character varying, _id integer) returns void
    language plpgsql
as
$$
declare
    _time int;
    _payload json;
    _command varchar;

begin
    _time := extract(epoch from current_timestamp)::integer;
    _command = 'O:16:"App\Jobs\TestJob":3:{s:5:"email";s:' ||
               octet_length(_email) || ':"' ||
               _email ||'";s:7:"subject";s:'||
               octet_length(_subject) ||':"' ||
               _subject || '";s:2:"id";i:' || _id::varchar || ';}';
    _payload = json_build_object(
            'uuid', uuid_generate_v4(),
            'displayName', 'App\Jobs\TestJob',
            'job', 'Illuminate\Queue\CallQueuedHandler@call',
            'maxTries', null,
            'maxExceptions', null,
            'failOnTimeout', false,
            'backoff', null,
            'timeout', null,
            'retryUntil', null,
            'data', (json_build_object('commandName', 'App\Jobs\TestJob',
                                       'command', _command)));
    insert into jobs(queue, payload, attempts, reserved_at, available_at, created_at)
        values ('default', _payload, 0, null, _time, _time);
end;
$$;

alter function set_job(varchar, varchar, integer) owner to test;
HERE;
        DB::unprepared($sql);
    }

    public function down(): void
    {
        DB::unprepared('drop function if exists set_job');
    }
};

Performing migration

php artisan migrate 

Go to the container terminal and try to call this function

test=# select set_job('e@mail.ru', 'Кирилица в заголовке', 09876);
 set_job
---------

(1 row)

Launch the worker in the terminal

php artisan queue:work --once
And look at the log

Oops. Almost everything is correct. Homework for readers: find the error and correct the code of the stored function so that it works correctly for any set of data.

Epilogue

In this simple way you can make Laravel react to events occurring in the database: stored functions and triggers.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *