ease of asynchronous operations on PostgreSQL with SQLAlchemy

Hello friends! This article will be a real boon for anyone who is already familiar with the asyncpg-lite library, wants to get acquainted with it, or simply wants to easily and effectively use the asynchronous capabilities of SQLAlchemy with asyncpg.

What is asynpg-lite?

Imagine a library that combines the power of asynchronous programming via asyncpg with the endless possibilities of SQLAlchemy. This is asynpg-lite – a simple and reliable library created so that everyone, even beginners, can take advantage of its potential.

Why is it important?

In a world where asynchronous programming is becoming increasingly popular, asynpg-lite offers the ideal solution for working with PostgreSQL. You don't need in-depth programming knowledge or complex concepts. A basic understanding of Python (lists, dictionaries, strings, etc.) and the basics of SQL and PostgreSQL (tables, CRUD operations, column types, etc.) is sufficient.

What awaits you?

We'll start with the very basics and walk through the basic methods of this library step by step. This will allow you to immediately start using it in your projects and experience all the benefits of asynchronous work with the database.

Ready to dive into the world of asynchronous operations on PostgreSQL using asynpg-lite and SQLAlchemy? Then let's get started!

Beginning of work

To test the examples I'll give you, you'll need a PostgreSQL database. It can be installed either on your local computer or located remotely, for example, on a VPS server.

How to deploy a PostgreSQL database on a VPS server in 5 minutes, I described in this article: [читать статью].

Today we will analyze all the functions of the library without analyzing the code of the library itself (that is, today it is just practice).

Installation

To install the latest current version of the library, run the command:

pip install --upgrade asyncpg-lite

Or use requirements.txt (0.3.1.3 is the most current version at the moment, all versions less than 0.3 have been removed and are no longer available for download):

asyncpg-lite~=0.3.1.3
pip install -r requirements.txt

Class initiation

After installing the library, you need to initiate a class to work with asynpg-lite so that the object can connect to the database and interact with it.

Importing the library:

from asyncpg_lite import DatabaseManager

Now let's connect to the database. For convenience, I have provided two connection options:

  • Via DSN link

  • Via connection data (pass a Python dictionary with connection data).

Initiation option via DSN link

postgresql://USER_LOGIN:PASSWORD@HOST:PORT/DB_NAME

Please note that the library itself will transform the link into the correct format for interaction with asyncpg (postgresql + postgresql).

Besides. The new format allows @ to be specified in the connection password.

An example of using environment variables to hide a DSN link in a .env file (using the python-decouple library):

from asyncpg_lite import DatabaseManager
from decouple import config
import asyncio

pg_link = config('PG_LINK')
deletion_password = config('ROOT_PASS')


async def main():
    db_manager = DatabaseManager(db_url=pg_link, deletion_password=deletion_password)
    # Далее используем db_manager...

Connection option indicating connection data

To do this, when initiating the manager object, you must pass the auth_params parameter. This is a Python dictionary containing the following keys:

  • host: str

  • port: int

  • user: str

  • password: str

  • database: str

Required parameter: deletion_password

The main meaning of the parameter is that when using critical functions, such as deleting a table or all data from a table, it is necessary to explicitly specify the password that was passed when initiating the class. This is done to protect against accidental deletion of data and unauthorized access.

When initiating a class, there are also the following optional parameters:

  • expire_on_commit: bool (default True)

  • echo: bool (default False)

  • log_level (default logging.INFO)

expire_on_commit

The expire_on_commit flag is used to control the expiration of objects stored in a session.

expire_on_commit=True (default): After commit() is called, all objects that were modified in the session are marked as “obsolete”. This means that the next time these objects are accessed they will be automatically reloaded from the database. This ensures data consistency and ensures that changes made by other sessions or processes are visible.

expire_on_commit=False: Objects will not be marked as expired after calling commit(). This can be useful in cases where you are confident that the data in the database is not being modified by other processes and want to avoid additional queries to the database to improve performance.

echo

The echo flag controls the output of SQL queries to the console.

echo=False (default): Do not print SQL queries to the console.

echo=True: Output all SQL queries executed by the engine to the console. This is useful for debugging as it allows you to see all the requests that are sent to the database.

log_level

log_level is the logging level. In total there are the following options:

  • DEBUG: Detailed information that is mainly useful for diagnosing problems. Used for development and debugging.

  • INFO: Confirmation that things are working as expected. Used for general information about the progress of the program.

  • WARNING: An indication that something unexpected has happened, or indicates a problem in the near future (for example, “the disk will run out soon”). The program works as expected.

  • ERROR: Due to a more serious problem, the program cannot perform a certain function.

  • CRITICAL: A serious error that may prevent the program from continuing execution.

General principle of interaction with the manager

To work with the database manager from asyncpg_lite, you must use the asynchronous context manager with. Thanks to this approach, you have two options at once:

  • Create complex and multi-level queries against a PostgreSQL database.

  • Don't worry about closing the database connection as it is automatically closed once the manager is finished.

Now let's instantiate our first class object (I'll use a dictionary to connect):

from asyncpg_lite import DatabaseManager
import asyncio


from decouple import config
async def main():
    db_manager = DatabaseManager(auth_params=autch_param, deletion_password=root_pass)
    async with db_manager:
        pass

Even though we haven't executed any methods, our logs will show:

2024-06-20 21:55:53,233 – INFO – Database instance created with log level: INFO

2024-06-20 21:55:53,301 – INFO – Connect with PostgreSQL success!

2024-06-20 21:55:53,302 – INFO – Disconnected from the database.

In this example we see that we have successfully initiated the object, connected to the database and, since we used async with db_managerbut the method was not launched – the connection ended automatically.

Now that we know how to connect to the database and how to interact with it, let's start practicing.

Library functions

The asyncpg-lite library can:

  • Create tables.

  • Receive data (returned as a list of Python dictionaries or as a dictionary, accepts filtering in the form of lists and dictionaries).

  • Add data to the table with data update if there is a conflict on the primary key or ignoring re-adding (I will explain in detail below)

  • Change data (accepts a condition for selecting entries in the form of a dictionary or list of dictionaries and accepts a dictionary with new data).

  • Delete data (can delete a table, all data from a table, or values ​​from a table using special filters).

As you can see, the library covers all CRUD operations using only the basic data types from Python 3.

Creating a table

To create a table, use the create_table method. The function accepts:

  • table_name: Table name.

  • columns: A list of strings defining the columns of the table.

Sample code to create a user table:

async def main():
    from sqlalchemy import BigInteger, String, Integer

    db_manager = DatabaseManager(auth_params=autch_param, deletion_password=root_pass)
    async with db_manager:
        columns = [
            {"name": "user_id", "type": BigInteger, "options": {"primary_key": True, "autoincrement": False}},
            {"name": "user_name", "type": String, "options": {"default": 'Vasya', 'unique': True}},
            {"name": "user_surname", "type": String},
            {"name": "user_age", "type": Integer},
        ] 
await db_manager.create_table(table_name="new_users", columns=columns)

Note. To describe table columns, you must use a list of Python dictionaries, where one dictionary is a description of one column.

Each dictionary must have 2 required parameters: name (column name) and type (data type in the column). Additionally, you can pass the options key. There, for example, you can specify a default value, indicate that this is a primary key, and so on.

The only thing that may not be clear here is the description of data types. Although it is insignificant, it differs in SQLAlchemy from its counterparts in SQL (PostgreSQL). Below is a comparison table between Python data types, SQL data types and SQLAlchemy.

As you already understood, each data type must be imported from sqlalchemy. sqlalchemy itself will install automatically with the installation of asyncpg-lite.

Let's execute the function and look at the logs:

2024-06-20 22:27:33,027 - INFO - Database instance created with log level: INFO
2024-06-20 22:27:33,104 - INFO - Connect with PostgreSQL success!
2024-06-20 22:27:35,145 - INFO - Table 'new_users' created with columns: ['user_id', 'user_name', 'user_surname', 'user_age']
2024-06-20 22:27:35,221 - INFO - Disconnected from the database.

Let's look at the table:

We see that the table was created with the data that we wanted (the unique type is not visible on the screen, but if you add the same name again there will be an error).

All other methods will be in pure Python, so it will be simpler.

Adding data to a table

To add (update) data to the table, we will use the insert_data_with_update method. This method accepts the following parameters:

  • table_name: Name of the table into which we will insert data

  • records_data: A dictionary or list of dictionaries with data to be inserted.

  • conflict_column: Name of value in table with PrimaryKey (string)

  • update_on_conflict: Flag with which you indicate whether the data needs to be updated if there is a conflict on the primary key. If you set it to True (this is the default), then in the event of a conflict, all parameters in the line will be rewritten to match the array that you passed. Otherwise, conflicting entries will simply be ignored

The method will automatically determine whether a dictionary or a list of dictionaries is passed.

Let's try to add one user to our created table:

async def main():
    db_manager = DatabaseManager(auth_params=autch_param, deletion_password=root_pass)
    async with db_manager:
        user_data = {'user_id': 1, 'user_name': 'Вася', 'user_surname': 'Сидоров', 'user_age': 40}
        await db_manager.insert_data_with_update(table_name="new_users",
                                                 records_data=user_data,
                                                 conflict_column='user_id', update_on_conflict=False)

Execute and look at the logs:

2024-06-20 23:15:22,570 - INFO - Database instance created with log level: INFO
2024-06-20 23:15:22,639 - INFO - Connect with PostgreSQL success!
2024-06-20 23:15:26,997 - INFO - Inserted/Updated 1 records into table 'new_users'.
2024-06-20 23:15:27,073 - INFO - Disconnected from the database.

Let's look at the table:

We see that the user has been added. Now let's try to add several users at once, updating the name of the user with ID 1:

async def main():
    db_manager = DatabaseManager(auth_params=autch_param, deletion_password=root_pass)
    async with db_manager:
        user_data = [
            {'user_id': 1, 'user_name': 'Василий', 'user_surname': 'Сидоренко', 'user_age': 40},
            {'user_id': 2, 'user_name': 'Дмитрий', 'user_surname': 'Иванов', 'user_age': 30},
            {'user_id': 3, 'user_name': 'Олег', 'user_surname': 'Смирнов', 'user_age': 46},
            {'user_id': 4, 'user_name': 'Петр', 'user_surname': 'Петров', 'user_age': 23}
        ]
        await db_manager.insert_data_with_update(table_name="new_users",
                                                 records_data=user_data,
                                                 conflict_column='user_id', update_on_conflict=False)

Let's look at the table:

We see that the data has not been updated, but at the same time we did not receive any errors.

Now let's add one new user and update the data for the other 2:

async def main():
    db_manager = DatabaseManager(auth_params=autch_param, deletion_password=root_pass)
    async with db_manager:
        user_data = [
            {'user_id': 1, 'user_name': 'Василий', 'user_surname': 'Сидоренко', 'user_age': 40},
            {'user_id': 4, 'user_name': 'Петя', 'user_surname': 'Петров', 'user_age': 24},
            {'user_id': 5, 'user_name': 'Антон', 'user_surname': 'Антонов', 'user_age': 54}
        ]
        await db_manager.insert_data_with_update(table_name="new_users",
                                                 records_data=user_data,
                                                 conflict_column='user_id', update_on_conflict=True)

Let's do it and look at the table:

We see that the data has been successfully updated/added. The logs tell us this:

2024-06-20 23:21:06,166 - INFO - Database instance created with log level: INFO
2024-06-20 23:21:06,234 - INFO - Connect with PostgreSQL success!
2024-06-20 23:21:10,639 - INFO - Inserted/Updated 3 records into table 'new_users'.
2024-06-20 23:21:10,716 - INFO - Disconnected from the database.

Receiving data

To retrieve data from a table, use the select_data method. The method accepts the following parameters:

  • table_name: Table name.

  • where_dict: Conditions for filtering data.

  • columns: List of columns to retrieve.

  • one_dict: Whether to return only one entry as a dictionary.

Let's get all user values ​​with user_id equal to 3:

async def main():
    db_manager = DatabaseManager(auth_params=autch_param, deletion_password=root_pass)
    async with db_manager:
        user_info = await db_manager.select_data(table_name="new_users", where_dict={'user_id': 3})
        print(user_info)

Let's look at the logs:

2024-06-20 23:23:38,724 - INFO - Database instance created with log level: INFO
2024-06-20 23:23:38,823 - INFO - Connect with PostgreSQL success!

[{'user_id': 3, 'user_name': 'Олег', 'user_surname': 'Смирнов', 'user_age': 46}]

2024-06-20 23:23:43,245 - INFO - Selected 1 rows from table 'new_users'.
2024-06-20 23:23:43,320 - INFO - Disconnected from the database.

Note that despite the request being made by one user, a list is returned. For convenience in such cases, the one_dict flag is provided, which returns one dictionary:

user_info = await db_manager.select_data(table_name="new_users", where_dict={'user_id': 3}, one_dict=True)

We see that one dictionary is returned. This is convenient when you need to get information about a specific user, for example, in Telegram bots.

If you do not pass the columns argument, all columns are returned by default. You can pass a list of columns whose values ​​need to be retrieved:

user_info = await db_manager.select_data(table_name="new_users", where_dict={'user_id': 3}, one_dict=True, columns=['user_name', 'user_surname'])

Logs:

2024-06-20 23:25:47,402 - INFO - Database instance created with log level: INFO
2024-06-20 23:25:47,471 - INFO - Connect with PostgreSQL success!

{'user_name': 'Олег', 'user_surname': 'Смирнов'}

2024-06-20 23:25:52,167 - INFO - Disconnected from the database.

Now we get a list of all users with all values:

users_info = await db_manager.select_data(table_name="new_users")
for i in users_info:
    print(i)

Logs:

2024-06-20 23:27:44,575 - INFO - Database instance created with log level: INFO
2024-06-20 23:27:44,655 - INFO - Connect with PostgreSQL success!

{'user_id': 2, 'user_name': 'Дмитрий', 'user_surname': 'Иванов', 'user_age': 30}
{'user_id': 3, 'user_name': 'Олег', 'user_surname': 'Смирнов', 'user_age': 46}
{'user_id': 1, 'user_name': 'Василий', 'user_surname': 'Сидоренко', 'user_age': 40}
{'user_id': 4, 'user_name': 'Петя', 'user_surname': 'Петров', 'user_age': 24}
{'user_id': 5, 'user_name': 'Антон', 'user_surname': 'Антонов', 'user_age': 54}

2024-06-20 23:27:49,148 - INFO - Selected 5 rows from table 'new_users'.
2024-06-20 23:27:49,226 - INFO - Disconnected from the database.

Now let’s complicate the query and get the first and last name values ​​for those users whose user_id is 1, 3 or 4:

user_info = await db_manager.select_data(table_name="new_users",
                                         where_dict=[{'user_id': 1}, {'user_id': 3}, {'user_id': 4}],
                                         columns=['user_surname', 'user_name'])
for user_data in user_info:
    print(user_data)

Logs:

2024-06-20 23:28:55,190 - INFO - Database instance created with log level: INFO
2024-06-20 23:28:55,262 - INFO - Connect with PostgreSQL success!

{'user_surname': 'Смирнов', 'user_name': 'Олег'}
{'user_surname': 'Сидоренко', 'user_name': 'Василий'}
{'user_surname': 'Петров', 'user_name': 'Петя'}

2024-06-20 23:28:59,702 - INFO - Selected 3 rows from table 'new_users'.
2024-06-20 23:28:59,781 - INFO - Disconnected from the database.

Changing data

To change data, the asyncpg-lite library provides a universal method update_data. This method accepts the following parameters:

  • table_name: Table name.

  • where_dict: Conditions for selecting entries to update (dictionary or list of dictionaries).

  • update_dict: Dictionary with data to update or string.

Let's look at an example usage:

async def main():
    db_manager = DatabaseManager(auth_params=autch_param, deletion_password=root_pass)
    async with db_manager:
        # Пример одиночного словаря
        await db_manager.update_data(
            table_name="new_users",
            where_dict={'user_id': 2},
            update_dict={'user_surname': 'Иванов', 'user_age': 42}
        )

        # Пример списка словарей
        await db_manager.update_data(
            table_name="new_users",
            where_dict=[{'user_id': 2}, {'user_id': 3}],
            update_dict={'user_surname': 'Руссо', 'user_age': 40}
        )


asyncio.run(main())

Logs:

2024-06-20 23:32:37,676 - INFO - Database instance created with log level: INFO
2024-06-20 23:32:37,753 - INFO - Connect with PostgreSQL success!
2024-06-20 23:32:42,026 - INFO - Updated records in table 'new_users' with conditions {'user_id': 2}.
2024-06-20 23:32:43,699 - INFO - Updated records in table 'new_users' with conditions [{'user_id': 2}, {'user_id': 3}].
2024-06-20 23:32:43,860 - INFO - Disconnected from the database.

Please note: I did not just use the method alone, but applied serial data processing. In this example, you can see the advantage of using the async with approach.

Deleting data

As I described above, the library provides three formats for deleting data:

  • Removing records from a table by parameters (method delete_data).

  • Removing all records from a table (method delete_all_data).

  • Deleting a table (method drop_table).

delete_data method

Method delete_data accepts the following parameters:

  • table_name: Table name.

  • where_dict: Conditions for selecting entries to delete (dictionary or list of dictionaries).

Let's remove the user from the table with user_id = 1:

await db_manager.delete_data('new_users', {'user_id': 1})

Logs:

2024-06-21 00:08:06,420 - INFO - Database instance created with log level: INFO
2024-06-21 00:08:06,491 - INFO - Connect with PostgreSQL success!
2024-06-21 00:08:10,817 - INFO - Deleted records from table 'new_users' with conditions {'user_id': 1}.
2024-06-21 00:08:10,895 - INFO - Disconnected from the database.

We check:

We see that the entry has been deleted.

Now let's remove users from user_id = 2 and 3:

await db_manager.delete_data('new_users', [{'user_id': 2}, {'user_id': 3}])

Logs:

2024-06-21 00:11:32,750 - INFO - Database instance created with log level: INFO
2024-06-21 00:11:32,847 - INFO - Connect with PostgreSQL success!
2024-06-21 00:11:38,181 - INFO - Deleted records from table 'new_users' with conditions [{'user_id': 2}, {'user_id': 3}].
2024-06-21 00:11:38,277 - INFO - Disconnected from the database.

Let's look:

We see that there is only one user left in the table. We will remove it using the method delete_all_data. This method accepts:

  • table_name: Table name.

  • password: Password that must match the verification password specified when the manager was initialized.

Code:

await db_manager.delete_all_data('new_users', password=root_pass)

Logs:

2024-06-21 00:15:23,375 - INFO - Database instance created with log level: INFO
2024-06-21 00:15:23,467 - INFO - Connect with PostgreSQL success!
2024-06-21 00:15:28,184 - INFO - Deleted all data from table 'new_users'.
2024-06-21 00:15:28,265 - INFO - Disconnected from the database.

All information has been successfully deleted.

The drop_table method accepts the same data as delete_all_data, but deletes the table.

await db_manager.drop_table('new_users', password=root_pass)

Logs:

2024-06-21 00:16:57,158 - INFO - Database instance created with log level: INFO
2024-06-21 00:16:57,228 - INFO - Connect with PostgreSQL success!
2024-06-21 00:17:02,850 - INFO - Table 'new_users' dropped.
2024-06-21 00:17:02,947 - INFO - Disconnected from the database.

Conclusion:

The library is currently in beta testing. Therefore, those who decide to use it, give feedback in the comments to this publication. Please indicate what you would like to add or add to my library.

Guys who have extensive experience in developing and interacting with PostgreSQL via Python, who also want to create something useful for everyone, contact me in any way possible.

I know that the code that currently underlies the library is not perfect. Although it is already able to solve many problems when interacting with a PostgreSQL database, making this process not just comfortable, but also enjoyable.

That's all I wanted to say. I hope that the project asyncpg-lite will develop further. Thank you for your attention and I look forward to your feedback on the library, I hope it will be pleasant.

PS. In the first description of the library, at the end, I gave a practical task. I strongly recommend repeating it with the current update.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *