code improvements, methods for updating and deleting data

For example, let's make changes to the method find_one_or_none and let's look at them in practice:

@classmethodasync def find_one_or_none(cls, session: AsyncSession, filters: BaseModel):    # Найти одну запись по фильтрам    filter_dict = filters.model_dump(exclude_unset=True)    try:        query = select(cls.model).filter_by(**filter_dict)        result = await session.execute(query)        record = result.scalar_one_or_none()        return record    except SQLAlchemyError as e:        raise

Instead of passing filters directly through **kwargsnow the method expects a class object BaseModel from Pydantic.

Then we transform the class object BaseModel into a regular Python dictionary using the method model_dump. Parameter exclude_unset=True guarantees that only explicitly specified fields will be included in the dictionary.

Thanks to the flag exclude_unset=True Fields with default values ​​that have not been explicitly set are excluded. This allows you to create more flexible and precise queries, taking into account only those filters that were actually specified by the user.

Further, the operating logic remains the same as in the old implementation. The resulting filter dictionary is applied to the query using filter_by(**filter_dict)and we get either the desired value or None.

Dynamic and regular models in Pydantic

Before we start testing the new approach in methods, I want to share a trick that allows you to generate simple Pydantic models on the fly. This will be useful when deep validation and description of fields is not required.

Pydantic has a function for this purpose create_model. It allows you to describe models by passing a simple description of fields into them. Here is a simple application example:

from pydantic import create_modelMyModel = create_model('DynamicModel',                        name=(str, ...),                       age=(int, 42))instance = MyModel(name="test")print(instance.dict())print(MyModel.__name__)  # Выведет 'DynamicModel'
  • MyModel — a variable in which the created model is saved.

  • DynamicModel (first argument) – a string specifying the name of the model for internal representation and display of information about it.

This separation allows you to create models with the same names but different structures in different parts of the code, avoiding name conflicts.

Practical application of create_model

Previously we had an example of the old approach to the method find_one_or_none:

@connectionasync def select_full_user_info_email(session, user_id, email):    rez = await UserDAO.find_one_or_none(session=session, id=user_id, email=email)    if rez:        return UserPydantic.from_orm(rez).dict()    return {'message': f'Пользователь с ID {user_id} не найден!'}

Now a lot has changed for us. First, our decorator has become a decorator factory, so it needs to be used with parentheses, and in this context can be passed commit=Falsesince we are simply getting data from the database.

Method find_one_or_none no longer accepts kwargs. Now we need a Pydantic model to which we will pass the correct values.

Also, to convert data from SQLAlchemy ORM to Pydantic model I used the methods from_orm() And dict() to create a dictionary. In the new version of Pydantic 2, these methods have been renamed, although their previous names remain supported:

Here's the final implementation:

@connection(commit=False)async def select_full_user_info_email(session: AsyncSession, user_id: int, email: str):    FilterModel = create_model(        'FilterModel',        id=(int, ...),        email=(EmailStr, ...)    )    user = await UserDAO.find_one_or_none(session=session, filters=FilterModel(id=user_id, email=email))    if user:        # Преобразуем ORM-модель в Pydantic-модель и затем в словарь        return UserPydantic.model_validate(user).model_dump()    return {'message': f'Пользователь с ID {user_id} не найден!'}

You can also describe the filter model as follows:

class FilterModel(BaseModel):    id: int    email: EmailStr

This option would work similarly.

This results in a bit more code, but it makes it clear for you and other developers what parameters to pass and what to expect. Therefore, I strongly recommend implementing Pydantic in your projects.

Adding generics

Before we rewrite the remaining methods BaseDAOlet's make one more improvement to our base class with methods, namely, add generics.

First we’ll write the code, and then we’ll figure it out.

from typing import Generic, TypeVar, Listfrom pydantic import BaseModelfrom sqlalchemy import selectfrom sqlalchemy.exc import SQLAlchemyErrorfrom sqlalchemy.ext.asyncio import AsyncSessionfrom dao.database import Base# Объявляем типовой параметр T с ограничением, что это наследник BaseT = TypeVar("T", bound=Base)class BaseDAO(Generic[T]):    model: type[T]

This change is due to improvements in typing and the use of generics in Python.

Let's look at the key aspects:

Generic[T]:

  • Makes a class BaseDAO generic

  • Allows you to specify a specific model type when inheriting.

T = TypeVar(“T”, bound=Base):

  • Creates a generic variable T.

  • Restricts T to only descendants of the class Base (SQLAlchemy models).

model: type[T]:

  • Indicates that the attribute model will be a type that is a subclass of T.

Advantages:

  • Improved static typing.

  • More accurate IDE hints.

  • Ability to use model-specific methods in child DAOs.

  • Preventing errors when working with the wrong model types.

This change makes the code more reliable and easier to develop, especially in large projects with many models and DAOs.

The changes now apply to child classes as well. Now we don't just inherit from BaseDAOor we can indicate specific models with which the child class works.

For example, it was:

class UserDAO(BaseDAO):    model = User

Became:

class UserDAO(BaseDAO[User]):    model = User

Advantages of this approach:

  • Static typing: IDEs and static code analysis tools will be able to correctly determine the type of model the DAO operates on.

  • Type Safety: You will get warnings or errors if you try to use methods with the wrong types.

  • Improved hints: The IDE will be able to provide more accurate hints when working with DAO methods.

Now we can rewrite all methods BaseDAO to the new format and move on.

from typing import Generic, TypeVar, Listfrom pydantic import BaseModelfrom sqlalchemy import selectfrom sqlalchemy.exc import SQLAlchemyErrorfrom sqlalchemy.ext.asyncio import AsyncSessionfrom dao.database import Base# Объявляем типовой параметр T с ограничением, что это наследник BaseT = TypeVar("T", bound=Base)class BaseDAO(Generic[T]):    model: type[T]    @classmethod    async def find_one_or_none_by_id(cls, data_id: int, session: AsyncSession):        # Найти запись по ID        try:            query = select(cls.model).filter_by(id=data_id)            result = await session.execute(query)            record = result.scalar_one_or_none()            return record        except SQLAlchemyError as e:            raise    @classmethod    async def find_one_or_none(cls, session: AsyncSession, filters: BaseModel):        # Найти одну запись по фильтрам        filter_dict = filters.model_dump(exclude_unset=True)        try:            query = select(cls.model).filter_by(**filter_dict)            result = await session.execute(query)            record = result.scalar_one_or_none()            return record        except SQLAlchemyError as e:            raise    @classmethod    async def find_all(cls, session: AsyncSession, filters: BaseModel | None):        if filters:            filter_dict = filters.model_dump(exclude_unset=True)        else:            filter_dict = {}        try:            query = select(cls.model).filter_by(**filter_dict)            result = await session.execute(query)            records = result.scalars().all()            return records        except SQLAlchemyError as e:            raise    @classmethod    async def add(cls, session: AsyncSession, values: BaseModel):        # Добавить одну запись        values_dict = values.model_dump(exclude_unset=True)        new_instance = cls.model(**values_dict)        session.add(new_instance)        try:            await session.flush()        except SQLAlchemyError as e:            await session.rollback()            raise e        return new_instance    @classmethod    async def add_many(cls, session: AsyncSession, instances: List[BaseModel]):        # Добавить несколько записей        values_list = [item.model_dump(exclude_unset=True) for item in instances]        new_instances = [cls.model(**values) for values in values_list]        session.add_all(new_instances)        try:            await session.flush()        except SQLAlchemyError as e:            await session.rollback()            raise e        return new_instances

As you can see, the changes are not particularly global. Here we simply added more order, security and readability to the code.

Updating data via SQLAlchemy

By this point you should have:

  • Database with tables and populated rows in PostgreSQL

  • Table models described

  • Created a decorator for generating sessions

  • A base and child classes have been created for working with information in the database (the base class must contain methods for adding and retrieving data from tables).

If so, then we can proceed with the method update.

When it comes to data updates, there are two main approaches in SQLAlchemy:

  1. Updating a Single Object

  2. Using the method update()

Updating a Single Object

This method uses the Unit of Work pattern and is ideal for updating a single record:

async def update_user(session: AsyncSession, user_id: int, new_name: str):    user = await session.get(User, user_id)    if user:        user.name = new_name        await session.commit()        return user    return None

In this approach:

  • We retrieve an object from the database

  • Changing its attributes

  • SQLAlchemy tracks changes

  • When calling session.commit() changes are saved in the database

Synchronous method get

Please note that I used the method get to receive data. In the last article, I misled you a little: in the latest version of SQLAlchemy, this method has become asynchronous, whereas previously it was only synchronous.

The essence of the method get is that it accepts a table model and an id PrimaryKey. It is noteworthy that it does not matter what the column with PrimaryKeyget will automatically substitute the name of the desired column.

Example of using the method get:

@connection(commit=False)async def get_select(session: AsyncSession, user_id: int):    user = await session.get(User, user_id)    print(UserPydantic.model_validate(user).model_dump())    asyncio.run(get_select(user_id=21))

Result:

{    "username": "bob_smith",    "email": "bob.smith@example.com",    "profile": {        "first_name": "Bob",        "last_name": "Smith",        "age": 25,        "gender": "мужчина",        "profession": "дизайнер",        "interests": ["gaming", "photography", "traveling"],        "contacts": {            "phone": "+987654321",            "email": "bob.smith@example.com"        }    }}

Now you can change the base class method BaseDao to obtain user information or None by ID.

Was:

@classmethodasync def find_one_or_none_by_id(cls, data_id: int, session: AsyncSession):    # Найти запись по ID    try:        query = select(cls.model).filter_by(id=data_id)        result = await session.execute(query)        record = result.scalar_one_or_none()        return record    except SQLAlchemyError as e:        raise

Became:

@classmethodasync def find_one_or_none_by_id(cls, data_id: int, session: AsyncSession):    # Найти запись по ID    try:        return await session.get(cls.model, data_id)    except SQLAlchemyError as e:        print(f"Error occurred: {e}")        raise

Examination:

@connection(commit=False)async def get_select(session: AsyncSession, user_id: int):    user = await UserDAO.find_one_or_none_by_id(session=session, data_id=user_id)    print(UserPydantic.model_validate(user).model_dump())asyncio.run(get_select(user_id=21))

The result remains the same.

Using the method update()

This method is effective for bulk updates or when we don't need to load the object into memory:

from sqlalchemy import updateasync def update_users_status(session: AsyncSession, age: int, new_status: str):    stmt = (        update(User)        .where(User.age > age)        .values(status=new_status)    )    result = await session.execute(stmt)    await session.commit()    return result.rowcount

Advantages of this method:

  • More efficient for bulk updates

  • Does not require pre-loading of objects

  • Allows complex conditions in where

As we can see, in this case it is again necessary to turn to the previously studied topic of data sampling. You can use either the where method (another name for filter) or filter_by. However, it is important to remember that the basic principle remains the same.

You must first determine which objects or single object SQLAlchemy will work with, and then perform the necessary upgrade steps.

Universal methods for updating data

Now that we know the general approaches to updating information, we can write universal methods in BaseDao.

The first method will take an id and Pydantic model with new values ​​and perform an update.

@classmethodasync def update_one_by_id(cls, session: AsyncSession, data_id: int, values: BaseModel):    values_dict = values.model_dump(exclude_unset=True)    try:        record = await session.get(cls.model, data_id)        for key, value in values_dict.items():            setattr(record, key, value)        await session.flush()    except SQLAlchemyError as e:        print(e)        raise e

Method update_one_by_id demonstrates an elegant use of object-oriented programming (OOP) in Python to update records in a database. Here's how it works:

  1. Getting a record object:

    • Using a unique identifier (data_id), we retrieve the corresponding entry from the database.

    • This record is represented as a table model object, which allows you to work with it as with a regular Python object.

  2. Updating object attributes:

    • New values ​​provided via Pydantic model are applied to object attributes.

    • We use setattr() to dynamically update each attribute, providing flexibility.

  3. Committing changes:

    • After updating the attributes, call session.flush()synchronizing changes to an object with the database without committing the transaction completely.

This approach combines the power of ORM (Object-Relational Mapping) with the flexibility of dynamically updating attributes.

Let's test the method

Let's create a file update_methods_dao.py and try to update the name of a specific user:

import asynciofrom pydantic import create_modelfrom sqlalchemy.ext.asyncio import AsyncSessionfrom dao.dao import UserDAOfrom dao.session_maker import connection@connection(commit=True)async def update_username(session: AsyncSession, user_id: int, new_username: str):    ValueModel = create_model('ValueModel', username=(str, ...))    await UserDAO.update_one_by_id(session=session, data_id=user_id, values=ValueModel(username=new_username))asyncio.run(update_username(user_id=1, new_username="yakvenalexx"))

Mass processing method based on the passed ID

@connection(commit=True)async def update_user(session: AsyncSession, user_id: int, new_username: str, email: int):    ValueModel = create_model('ValueModel', username=(str, ...), email=(EmailStr, ...))    await UserDAO.update_one_by_id(session=session, data_id=user_id, values=ValueModel(username=new_username, email=email))asyncio.run(update_user(user_id=1, email="mail@mail.ru", new_username="admin"))

To update the username value from the users table and the age from the associated profiles table, we need to create a separate method in the child class UserDAO. This method will explicitly indicate where the relevant variables are stored. Otherwise, the operating logic will not differ much from the universal method from BaseDAO.

Here's a code example:

class UserDAO(BaseDAO[User]):    model = User    @classmethod    async def update_username_age_by_id(cls, session: AsyncSession, data_id: int, username: str, age: int):        user = await session.get(cls.model, data_id)        user.username = username        user.profile.age = age        await session.flush()

Bulk update via SQLAlchemy update

Example of a mass update where all users with last name are selected Smithand everyone is set at 22 years of age:

@connection(commit=True)async def update_age_mass(session: AsyncSession, new_age: int, last_name: str):    try:        stmt = (            update(Profile)            .filter_by(last_name=last_name)            .values(age=new_age)        )        result = await session.execute(stmt)        updated_count = result.rowcount        print(f'Обновлено {updated_count} записей')        return updated_count    except SQLAlchemyError as e:        print(f"Error updating profiles: {e}")        raiseasyncio.run(update_age_mass(new_age=22, last_name="Smith"))

Universal method for mass update

Based on this knowledge, we can develop a universal method that will be used to bulk update records in our BaseDao class.

@classmethodasync def update_many(cls, session: AsyncSession, filter_criteria: BaseModel, values: BaseModel):    filter_dict = filter_criteria.model_dump(exclude_unset=True)    values_dict = values.model_dump(exclude_unset=True)    try:        stmt = (            update(cls.model)            .filter_by(**filter_dict)            .values(**values_dict)        )        result = await session.execute(stmt)        await session.flush()        return result.rowcount    except SQLAlchemyError as e:        print(f"Error in mass update: {e}")        raise

Let's write a function based on it.

@connection(commit=True)async def update_age_mass_dao(session: AsyncSession, new_age: int, last_name: str):    filter_criteria = create_model('FilterModel', last_name=(str, ...))    values = create_model('ValuesModel', age=(int, ...))    await ProfileDAO.update_many(session=session,                                 filter_criteria=filter_criteria(last_name=last_name),                                 values=values(age=new_age))asyncio.run(update_age_mass_dao(new_age=33, last_name="Smith"))

Please note that here I have already imported ProfileDAO. The entry itself turned out to be quite readable, and the code itself was understandable.

Removal in OOP style

In the object-oriented approach, you first need to get the desired object from the database. This can be done through methods get() or select():

  • Method get() allows you to quickly find a record by primary key (for example, id). This is useful when you need to delete an entry whose ID is known in advance.

  • Method select() allows you to create more complex queries, for example based on multiple fields, to select the desired objects.

Once you have received an object, you can delete it using session.delete(obj)and then confirm the changes by calling commit():

# Пример удаления одного объектаobj = await session.get(User, 1)  # Получаем объект по первичному ключуif obj:    await session.delete(obj)  # Удаляем объект    await session.commit()  # Подтверждаем удаление

In this example, if an object with ID = 1 exists, it will be deleted. This approach is useful if you need to perform checks or other logic before deleting.

If you select multiple objects via selectdeletion can be done in a loop:

# Получаем объекты, которые нужно удалитьresult = await session.execute(select(User).where(User.age > 30))users_to_delete = result.scalars().all()# Удаляем объекты в циклеfor user in users_to_delete:    await session.delete(user)# Подтверждаем измененияawait session.commit()

This method is suitable for selective deletion, but is less convenient for mass deletion of a large number of records.

Bulk delete at the database level

When you need to delete several records at once, it is better to perform a bulk delete on the database side. This approach saves resources because it avoids loading objects into memory.

For this purpose the method is used delete() at the level Sessionwhich allows SQLAlchemy to send a single delete request directly to the database:

# Пример массового удаленияawait session.execute(    delete(User).where(User.age > 30))await session.commit()  # Подтверждаем массовое удаление

In this example, all user records older than 30 years are deleted in one request. This method is effective when there is no need to check before deleting each object, and performance is important.

Removal practice

Now let’s put deleting records from our database into practice, and then we’ll add universal methods for deleting records from our tables. To write functions for deleting records, let's create a new file delete_methods_dao.py.

First, let's delete the user from ID 5. The code will look like this:

from sqlalchemy.ext.asyncio import AsyncSessionfrom dao.session_maker import connectionfrom models import User@connection(commit=True)async def delete_user_by_id(session: AsyncSession, user_id: int):    user = await session.get(User, user_id)    if user:        await session.delete(user)asyncio.run(delete_user_by_id(user_id=5))

As you can see, the code is as Pythonic as possible. Line user = await session.get(User, user_id) we get the user object and then delete it. Commit is not specified here, since it is provided at the session level in the decorator.

Here a universal method immediately emerges for BaseDao. Let's write it.

@classmethodasync def delete_one_by_id(cls, data_id: int, session: AsyncSession):    # Найти запись по ID    try:        data = await session.get(cls.model, data_id)        if data:            await session.delete(data)            await session.flush()    except SQLAlchemyError as e:        print(f"Error occurred: {e}")        raise

Now let's write a function with a new method:

@connection(commit=True)async def delete_user_by_id_dao(session: AsyncSession, user_id: int):    await UserDAO.delete_one_by_id(session=session, data_id=user_id)asyncio.run(delete_user_by_id_dao(user_id=10))

Now let's write a method to delete records based on a condition. Also for now at the level of an ordinary function, which we then transform into a universal method BaseDao.

Let's delete all users who have username starts with “ja”. The code will look like this:

@connection(commit=True)async def delete_user_username_ja(session: AsyncSession, start_letter: str="ja"):    stmt = delete(User).where(User.username.like(f"{start_letter}%"))    await session.execute(stmt)asyncio.run(delete_user_username_ja())

In this method, first we generate an SQL query to delete a record from the database:

stmt = delete(User).where(User.username.like(f"{start_letter}%"))

Next, using the method session.execute() we simply execute this request. The approach is convenient because it is flexible in conjunction with where / filter_byand we don't have the need to save the data on the application side before deleting it like in the method getwhen we execute the request await session.delete(obj).

Let's write a universal method for deleting records from the database, but we will base it on filter_by. Complex filters such as where (filter), we will move it to the side of child classes when necessary.

The method will look like this:

@classmethodasync def delete_many(cls, session: AsyncSession, filters: BaseModel | None):    if filters:        filter_dict = filters.model_dump(exclude_unset=True)        stmt = delete(cls.model).filter_by(**filter_dict)    else:        stmt = delete(cls.model)    try:        result = await session.execute(stmt)        await session.flush()        return result.rowcount    except SQLAlchemyError as e:        print(f"Error occurred: {e}")        raise

I made the method universal for removing all data from the table, so be careful. If you don't pass any filters (Pydantic models) while using it, then all data will be deleted.

Let's check.

@connection(commit=True)async def delete_user_by_password(session: AsyncSession, password: str):    filter_criteria = create_model('FilterModel', password=(str, ...))    await UserDAO.delete_many(session=session, filters=filter_criteria(password=password))asyncio.run(delete_user_by_password(password='asdasd'))

With this post we removed all users with the password “asdasd”.

To delete all user data, you could write a function like this:

@connection(commit=True)async def delete_all_users(session: AsyncSession):    await UserDAO.delete_many(session=session, filters=None)

Conclusion

Today we took a closer look at the key aspects of working with SQLAlchemy, focusing on delete and update operations. I tried to convey the material in such a way that you would form not only an understanding of specific actions, but also a holistic understanding of the principles of working with SQLAlchemy.

Although the framework may seem multi-layered and complex, SQLAlchemy remains a flexible and user-friendly tool that can be learned by anyone. The main thing is to understand the basic concepts of classes and gradually delve into practical work with code. If you've taken a good look at the previous articles on asynchronous SQLAlchemy 2 and today's article, you already have all the knowledge you need to confidently use tabular databases in Python. Further development is just a matter of experience and practice.

We have discussed two main approaches to updating and deleting data:

  1. Object-oriented approachin which data is loaded onto the application side and processed as objects. This method is intuitive and convenient, but may require more resources.

  2. Direct execution of SQL queries on the database sidewhich helps optimize the use of application resources, especially during bulk operations.

The full source code of the project and exclusive materials are available in my Telegram channel “The Easy Way to Python“, where there are already more than 1000 like-minded people! And if you need to deploy a project on a server and deliver updates to it with three commands in the IDE, register with Amvera Cloudand get 111 rubles. for testing functionality.

There are plans for several more articles on asynchronous work with SQLAlchemy. The continuation of this series will depend on your response and interest.

Thank you for your attention! See you soon!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *