Experience of implementing DDD in a project on FastAPI – Part 1

Hello, Khabravians!

In a series of articles I will tell you what it is DDD (domain-driven design) and what its advantages and disadvantages are. Let's figure out when to use the approach and how to combine it with FastAPIa popular ASGI framework in Python.

In the first part we will look at design patterns Repository And Unit of Work. With their help we work through interfaces. Patterns help in dividing the code into layers: the main logic of the application is represented by internal layers, and the technologies used are external.

In the second part, we will discuss how to bring the project to an even more scalable and flexible state using event-driven architecture.

For clarity, let's implement DDD in the rating application. In it, the user registers to rate other people and control their rating. Look for the code example in the repository at link.

A few words about DDD

At the heart of DDD is the Domain. This is a model of the object and its tasks for which the application is built. The bill we pay, the Message we send, or the User we rate.

Domains are built on real-world entities and are at the center of the application. The graphical interface, storage location, and other application components related to implementations and presented in external layers depend on domains. In DDD, you cannot design database tables and then adjust the domain to them – the domain determines the database implementation.

DDD gives freedom to change technologies. It is easier to change the framework or switch from SQL to a NoSQL solution. Switching to a new tool changes the way of interaction with the domain. The domain itself, the structure and logic of the application based on it are not affected. This works because the domain is the internal layer of the application, and the database is presented in one of the external ones. In DDD, the external layers depend on the internal ones.

DDD diagram

DDD diagram

About Repository and Unit of Work patterns

Initially, the application has a standard structure that corresponds to best practices developments using FastAPI.

We have three models (src/users/models.py), which reflect the key essence of the program:

  • User (UserModel);

  • User statistics (UserStatisticsModel);

  • Voting history (UserVoteModel).

class UserModel(Base):
    __tablename__ = 'users'

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String, unique=True)
    password: Mapped[str] = mapped_column(String)
    username: Mapped[str] = mapped_column(String, unique=True)


class UserStatisticsModel(Base):
    __tablename__ = 'users_statistics'

    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey('users.id', onupdate="CASCADE", ondelete="CASCADE"),
        nullable=False
    )
    likes: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
    dislikes: Mapped[int] = mapped_column(Integer, nullable=False, default=0)


class UserVoteModel(Base):
    __tablename__ = 'users_votes'

    id: Mapped[int] = mapped_column(primary_key=True)
    voted_for_user_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey('users.id', onupdate="CASCADE", ondelete="CASCADE"),
        nullable=False
    )
    voting_user_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey('users.id', onupdate="CASCADE", ondelete="CASCADE"),
        nullable=False
    )

The models are actively used in the application and pass through layers of service, dependencies and endpoints. The code is good, the tests pass, the program works.

However, we have built the architecture of the application in such a way that communication with the ORM in its internal layers is inevitable. For example, the service layer (src/users/service.py), to get user data, it is necessary to use SQLAlchemy:

class UsersService:

    def __init__(self, session_factory: async_sessionmaker = default_session_factory) -> None:
        self._session_factory: async_sessionmaker = session_factory

    async def register_user(self, user: UserModel) -> UserModel:
        async with self._session_factory() as session:
            session.add(user)
            await session.flush()
            session.add(UserStatisticsModel(user_id=user.id))
            await session.commit()
            return user

    async def get_user_by_email(self, email: str) -> UserModel:
        async with self._session_factory() as session:
            user: Optional[UserModel] = (await session.scalars(select(UserModel).filter_by(email=email))).one_or_none()
            if not user:
                raise UserNotFoundError

            return user

...

“Guys, we can't use SQLAlchemy anymore, the customer is against ORM. We urgently need to switch to pure SQL!” – with these words the project manager will come to you. You will have to rake through the entire application, spend dozens of hours on refactoring. Just so that the program works after the change of technology.

The transition would not have been so painful if DDD had been used. The domain would not have been associated with the implementation of storing information about it in the form of an ORM. This would have allowed it to be used throughout the application and not be afraid that it would be necessary to make corrections throughout the code base. Now the models (src/users/domain/models.py) look like this:

@dataclass
class UserModel(AbstractModel):
    email: str
    password: str
    username: str

    # Optional args:
    id: int = 0


@dataclass
class UserStatisticsModel(AbstractModel):
    user_id: int

    # Optional args:
    id: int = 0
    likes: int = 0
    dislikes: int = 0


@dataclass
class UserVoteModel(AbstractModel):
    voting_user_id: int  # Who votes
    voted_for_user_id: int  # Votes for who

    # Optional args:
    id: int = 0

Model (src/core/interfaces/models.py) is the base class from which all other models in the application are inherited. The class implements a single method that resembles model_dump() from Pydantic's BaseModel and translates models into a python dictionary format. This allows the domain representation to be passed to other objects using unpacking.

@dataclass
class AbstractModel(ABC):
    """
    Base model, from which any domain model should be inherited.
    """

    async def to_dict(
            self,
            exclude: Optional[Set[str]] = None,
            include: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:

        """
        Create a dictionary representation of the model.

        exclude: set of model fields, which should be excluded from dictionary representation.
        include: set of model fields, which should be included into dictionary representation.
        """

        data: Dict[str, Any] = asdict(self)
        if exclude:
            for key in exclude:
                try:
                    del data[key]
                except KeyError:
                    pass

        if include:
            data.update(include)

        return data

The database structure is now described in a separate file src/users/adapters/orm.py:

users_table = Table(
    'users',
    mapper_registry.metadata,
    Column('id', Integer, primary_key=True, autoincrement=True, nullable=False, unique=True),
    Column('email', String, nullable=False, unique=True),
    Column('password', String, nullable=False),
    Column('username', String, nullable=False, unique=True),
)

users_statistics_table = Table(
    'users_statistics',
    mapper_registry.metadata,
    Column('id', Integer, primary_key=True, autoincrement=True, nullable=False, unique=True),
    Column(
        'user_id',
        Integer,
        ForeignKey('users.id', onupdate="CASCADE", ondelete="CASCADE"),
        nullable=False
    ),
    Column('likes', Integer, nullable=False, default=0),
    Column('dislikes', Integer, nullable=False, default=0),
)

users_votes_table = Table(
    'users_votes',
    mapper_registry.metadata,
    Column('id', Integer, primary_key=True, autoincrement=True, nullable=False, unique=True),
    Column(
        'voted_for_user_id',
        Integer,
        ForeignKey('users.id', onupdate="CASCADE", ondelete="CASCADE"),
        nullable=False
    ),
    Column(
        'voting_user_id',
        Integer,
        ForeignKey('users.id', onupdate="CASCADE", ondelete="CASCADE"),
        nullable=False
    )
)

Declarative model mapping style seems complicated compared to imperative. However, it is not inferior in efficiency. You just need to get used to it.

Models with SQLAlchemy are now linked via mapping in this way:

def start_mappers():
    """
    Map all domain models to ORM models, for purpose of using domain models directly during work with the database,
    according to DDD.
    """

    # Imports here not to ruin alembic logics. Also, only for mappers they needed:
    from src.users.domain.models import UserModel, UserStatisticsModel, UserVoteModel

    mapper_registry.map_imperatively(class_=UserModel, local_table=users_table)
    mapper_registry.map_imperatively(class_=UserStatisticsModel, local_table=users_statistics_table)
    mapper_registry.map_imperatively(class_=UserVoteModel, local_table=users_votes_table)

The function is called at the start of the application as part of the life cycle of its components (src/app.py):

@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator:
    """
    Runs events before application startup and after application shutdown.
    """

    # Startup events:
    engine: AsyncEngine = create_async_engine(DATABASE_URL)
    async with engine.begin() as conn:
        await conn.run_sync(metadata.create_all)

    start_users_mappers()

    yield

    # Shutdown events:
    clear_mappers()

Let's implement the pattern Repositorywhich is an object for accessing the model. The pattern allows you to work with the database through abstractions and dependency injection. This makes it possible to change the implementation of interaction with the database without having to make changes to other code. We will have a repository interface with the most common methods (src/core/interfaces/repositories.py), from which repositories with implementations will be inherited:

class AbstractRepository(ABC):
    """
    Interface for any repository, which would be used for work with domain model, according DDD.

    Main purpose is to encapsulate internal logic that is associated with the use of one or another data
    storage scheme, for example, ORM.
    """

    @abstractmethod
    async def add(self, model: AbstractModel) -> AbstractModel:
        raise NotImplementedError

    @abstractmethod
    async def get(self, id: int) -> Optional[AbstractModel]:
        raise NotImplementedError

    @abstractmethod
    async def update(self, id: int, model: AbstractModel) -> AbstractModel:
        raise NotImplementedError

    @abstractmethod
    async def delete(self, id: int) -> None:
        raise NotImplementedError

    @abstractmethod
    async def list(self) -> List[AbstractModel]:
        raise NotImplementedError

We are working with SQLAlchemy and want to implement repositories using it. Let's create a class inheriting from the abstract base repository (src/core/database/interfaces/repositories.py):

class SQLAlchemyAbstractRepository(AbstractRepository, ABC):
    """
    Repository interface for SQLAlchemy, from which should be inherited all other repositories,
    which would be based on SQLAlchemy logics.
    """

    def __init__(self, session: AsyncSession) -> None:
        self._session: AsyncSession = session

We use the user repository class as an interface so as not to be tied to SQLAlchemy (src/users/interfaces/repositories.py):

class UsersRepository(AbstractRepository, ABC):
    """
    An interface for work with users, that is used by users unit of work.
    The main goal is that implementations of this interface can be easily replaced in users unit of work
    using dependency injection without disrupting its functionality.
    """

    @abstractmethod
    async def get_by_email(self, email: str) -> Optional[UserModel]:
        raise NotImplementedError

    @abstractmethod
    async def get_by_username(self, username: str) -> Optional[UserModel]:
        raise NotImplementedError

    @abstractmethod
    async def add(self, model: AbstractModel) -> UserModel:
        raise NotImplementedError

    @abstractmethod
    async def get(self, id: int) -> Optional[UserModel]:
        raise NotImplementedError

    @abstractmethod
    async def update(self, id: int, model: AbstractModel) -> UserModel:
        raise NotImplementedError

    @abstractmethod
    async def list(self) -> List[UserModel]:
        raise NotImplementedError

Practical implementation related to SQLAlchemy (src/users/adapters/repositories.py):

class SQLAlchemyUsersRepository(SQLAlchemyAbstractRepository, UsersRepository):

    async def get(self, id: int) -> Optional[UserModel]:
        result: Result = await self._session.execute(select(UserModel).filter_by(id=id))
        return result.scalar_one_or_none()

    async def get_by_email(self, email: str) -> Optional[UserModel]:
        result: Result = await self._session.execute(select(UserModel).filter_by(email=email))
        return result.scalar_one_or_none()

    async def get_by_username(self, username: str) -> Optional[UserModel]:
        result: Result = await self._session.execute(select(UserModel).filter_by(username=username))
        return result.scalar_one_or_none()

    async def add(self, model: AbstractModel) -> UserModel:
        result: Result = await self._session.execute(
            insert(UserModel).values(**await model.to_dict(exclude={'id'})).returning(UserModel)
        )

        return result.scalar_one()

    async def update(self, id: int, model: AbstractModel) -> UserModel:
        result: Result = await self._session.execute(
            update(UserModel).filter_by(id=id).values(**await model.to_dict(exclude={'id'})).returning(UserModel)
        )

        return result.scalar_one()

    async def delete(self, id: int) -> None:
        await self._session.execute(delete(UserModel).filter_by(id=id))

    async def list(self) -> List[UserModel]:
        """
        Returning result object instead of converting to new objects by
                    [UserModel(**await r.to_dict()) for r in result.scalars().all()]
        to avoid sqlalchemy.orm.exc.UnmappedInstanceError lately.

        Checking by asserts, that expected return type is equal to fact return type.
        """

        result: Result = await self._session.execute(select(UserModel))
        users: Sequence[Row | RowMapping | Any] = result.scalars().all()

        assert isinstance(users, List)
        for user in users:
            assert isinstance(user, UserModel)

        return users

For each model we have a separate repository-interface and repository-implementation. At first glance, the atomicity of transactions is violated: in the service layer, the user registration method created a user object and a template for statistics.

class UsersService:

    def __init__(self, session_factory: async_sessionmaker = default_session_factory) -> None:
        self._session_factory: async_sessionmaker = session_factory

    async def register_user(self, user: UserModel) -> UserModel:
        async with self._session_factory() as session:
            session.add(user)
            await session.flush()
            session.add(UserStatisticsModel(user_id=user.id))
            await session.commit()
            return user
...

In fact, atomicity is preserved because a SQLAlchemy session object is passed in when a repository inheriting from SQLAlchemyAbstractRepository is initialized. You can pass a single session to both repositories and, if the operations succeed, commit outside of both repositories.

Such repository initialization will force the session to be passed to them, which will lead to the service layer being dependent on SQLAlchemy. This problem is solved by the Unit of Work pattern.

Pattern name Unit of Work hints at its task of managing the atomicity of operations.

In Python, the context manager allows you not to worry about manually releasing a resource. We use it in the implementation of Unit of Work.

The logic of working with Unit of Work is similar to working with repositories: there is a base class and implementation classes. The difference is that it is not the methods that change, but the attributes of the class – repositories. Let's look at the base class Unit of Work (src/core/interfaces/units_of_work.py):

class AbstractUnitOfWork(ABC):
    """
    Interface for any units of work, which would be used for transaction atomicity, according DDD.
    """

    async def __aenter__(self) -> Self:
        return self

    async def __aexit__(self, *args, **kwargs) -> None:
        await self.rollback()

    @abstractmethod
    async def commit(self) -> None:
        raise NotImplementedError

    @abstractmethod
    async def rollback(self) -> None:
        raise NotImplementedError

Besides “magic” methods aenter And aexitThere are only two methods required to implement the asynchronous context manager mechanism:

  • commit to confirm operations with repositories;

  • rollback to roll back changes if something goes wrong.

Let's consider the abstract class Unit of Work, associated with SQLAlchemy. In the future, we will inherit the implementations of Unit of Work from it (src/core/database/interfaces/units_of_work.py):

class SQLAlchemyAbstractUnitOfWork(AbstractUnitOfWork):
    """
    Unit of work interface for SQLAlchemy, from which should be inherited all other units of work,
    which would be based on SQLAlchemy logics.
    """

    def __init__(self, session_factory: async_sessionmaker = default_session_factory) -> None:
        super().__init__()
        self._session_factory: async_sessionmaker = session_factory

    async def __aenter__(self) -> Self:
        self._session: AsyncSession = self._session_factory()
        return await super().__aenter__()

    async def __aexit__(self, *args, **kwargs) -> None:
        await super().__aexit__(*args, **kwargs)
        await self._session.close()

    async def commit(self) -> None:
        await self._session.commit()

    async def rollback(self) -> None:
        """
        Rollbacks all uncommited changes.

        Uses self._session.expunge_all() to avoid sqlalchemy.orm.exc.DetachedInstanceError after session rollback,
        due to the fact that selected object is cached by Session. And self._session.rollback() deletes all Session
        cache, which causes error on Domain model, which is not bound now to the session and can not retrieve
        attributes.

        https://pythonhint.com/post/1123713161982291/how-does-a-sqlalchemy-object-get-detached
        """

        self._session.expunge_all()
        await self._session.rollback()

The special feature of the class is that it is connected to SQLAlchemy via the AsyncSession object required by the implementation repositories.

As with repositories, the interface that the service layer will work with will allow you to not create a connection with the implementation (src/users/interfaces/units_of_work.py):

class UsersUnitOfWork(AbstractUnitOfWork, ABC):
    """
    An interface for work with users, that is used by service layer of users module.
    The main goal is that implementations of this interface can be easily replaced in the service layer
    using dependency injection without disrupting its functionality.
    """

    users: UsersRepository
    users_statistics: UsersStatisticsRepository
    users_votes: UsersVotesRepository

The peculiarity of such interfaces is the presence of class attributes for accessing repositories when working with Unit of Work.

Let's create a concrete implementation required for the application to work (src/users/units_of_work.py):

class SQLAlchemyUsersUnitOfWork(SQLAlchemyAbstractUnitOfWork, UsersUnitOfWork):

    async def __aenter__(self) -> Self:
        uow = await super().__aenter__()
        self.users: UsersRepository = SQLAlchemyUsersRepository(session=self._session)
        self.users_statistics: UsersStatisticsRepository = SQLAlchemyUsersStatisticsRepository(session=self._session)
        self.users_votes: UsersVotesRepository = SQLAlchemyUsersVotesRepository(session=self._session)
        return uow

In the implementation, we initialize the necessary implementation repositories. If necessary, when creating an instance of SQLAlchemyUsersUnitOfWork, we can pass a non-default session_factory, an instance of async_sessionmaker from SQLAlchemy, to the method init.

Consider a service layer that is no longer dependent on the implementation and is a separate combat unit (src/users/service.py):

class UsersService:
    """
    Service layer core according to DDD, which using a unit of work, will perform operations on the domain model.
    """

    def __init__(self, uow: UsersUnitOfWork) -> None:
        self._uow: UsersUnitOfWork = uow

    async def register_user(self, user: UserModel) -> UserModel:
        async with self._uow as uow:
            user = await uow.users.add(model=user)
            await uow.users_statistics.add(
                model=UserStatisticsModel(
                    user_id=user.id
                )
            )

            await uow.commit()
            return user

    async def get_user_by_email(self, email: str) -> UserModel:
        async with self._uow as uow:
            user: Optional[UserModel] = await uow.users.get_by_email(email)
            if not user:
                raise UserNotFoundError

            return user

Now the service layer works with domains through the Unit of Work, which it receives via dependency injection at initialization. We can spoof it by creating a fake Unit of Workwhat will be clearer than the magical MockObject with its internals (tests/users/fake_objects.py):

class FakeUsersUnitOfWork(UsersUnitOfWork):

    def __init__(
            self,
            users_repository: UsersRepository,
            users_statistics_repository: UsersStatisticsRepository,
            users_votes_repository: UsersVotesRepository
    ) -> None:

        super().__init__()
        self.users: UsersRepository = users_repository
        self.users_statistics: UsersStatisticsRepository = users_statistics_repository
        self.users_votes: UsersVotesRepository = users_votes_repository
        self.committed: bool = False

    async def commit(self) -> None:
        self.committed = True

    async def rollback(self) -> None:
        pass

When testing the service, we will transmit a fake Unit of Work at initialization (tests/users/unit/test_service.py):

@pytest.mark.anyio
async def test_users_service_register_user_success() -> None:
    users_unit_of_work: UsersUnitOfWork = FakeUsersUnitOfWork(
        users_repository=FakeUsersRepository(),
        users_statistics_repository=FakeUsersStatisticsRepository(),
        users_votes_repository=FakeUsersVotesRepository()
    )
    users_service: UsersService = UsersService(uow=users_unit_of_work)

    assert len(await users_repository.list()) == 0
    user: UserModel = UserModel(**FakeUserConfig().to_dict(to_lower=True))
    await users_service.register_user(user=user)
    assert len(await users_repository.list()) == 1
    assert len(await users_statistics_repository.list()) == 1

We can also fake repositories, because Unit of Work works with the interfaces that were defined for it (tests/users/fake_objects.py):

class FakeUsersRepository(UsersRepository):

    def __init__(self, users: Optional[Dict[int, UserModel]] = None) -> None:
        self.users: Dict[int, UserModel] = users if users else {}

    async def get(self, id: int) -> Optional[UserModel]:
        return self.users.get(id)

    async def get_by_email(self, email: str) -> Optional[UserModel]:
        for user in self.users.values():
            if user.email == email:
                return user

        return None

    async def get_by_username(self, username: str) -> Optional[UserModel]:
        for user in self.users.values():
            if user.username == username:
                return user

        return None

    async def add(self, model: AbstractModel) -> UserModel:
        user: UserModel = UserModel(**await model.to_dict())
        self.users[user.id] = user
        return user

    async def update(self, id: int, model: AbstractModel) -> UserModel:
        user: UserModel = UserModel(**await model.to_dict())
        if id in self.users:
            self.users[id] = user

        return user

    async def delete(self, id: int) -> None:
        if id in self.users:
            del self.users[id]

    async def list(self) -> List[UserModel]:
        return list(self.users.values())

Let's sum up the interim results

Advantages of the approach:

  • The application is independent of any specific technology, making it easy to replace without complex refactoring;

  • The work is carried out through interfaces, not implementations, which allows you to change the functionality of the application without editing many files;

  • Components are tested via interfaces and dependency injection. You can simply mock the injected object;

  • The project is scaled in a single style;

  • The code is reusable because it is not tied to specific implementations.

Disadvantages of the approach:

  • There are many abstractions and interfaces to write and maintain;

  • It is difficult for new participants to enter the project, as they need to understand the interaction of several levels of abstractions;

  • The code runs slower due to multiple abstractions overlapping each other.

DDD is useful in large, long-term projects when requirements change, technologies become obsolete, and the application scales. In static projects, where tasks, their scope, and solutions do not change, using the approach is inappropriate.

You can study DDD in more detail in the book by Percival G., Gregory B. – “Python Development Patterns: TDD, DDD, and Event-Driven Architecture”.

See you in the second part.

I would appreciate your support and constructive criticism!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *