Experience of implementing DDD in a project on FastAPI – Part 2

Hello, Habrites!

I'm Dima, a Python developer from 21 YARDa service for finding construction contractors.

This is the second part of the DDD series. In it, I will tell you how to add an event-oriented architecture to a project.

Table of contents

You can find the code for the experimental application in the repository at link. More about DDD and patterns Repository And Unit of Work read the first part at the link.

CQRS

The first step to implementing an event-oriented architecture is to implement CQRS (The Command and Query Responsibility Segregation). Let's apply this principle and separate data reading and its creation, updating, and deletion.

The view class will handle reading the data (src/users/entrypoints/views.py):

class UsersViews:
	"""
	Views related to users, which purpose is to return information upon read requests,
	due to the fact that write requests (represented by commands) are different from read requests.
 
	# TODO At current moment uses repositories pattern to retrieve data. In future can be changed on raw SQL
	# TODO for speed-up purpose
	"""
 
	def __init__(self, uow: UsersUnitOfWork) -> None:
    	self._uow: UsersUnitOfWork = uow
 
	async def get_user_account(self, user_id: int) -> UserModel:
    	users_service: UsersService = UsersService(self._uow)
    	user: UserModel = await users_service.get_user_by_id(id=user_id)
    	return user
 
	async def get_user_statistics(self, user_id: int) -> UserStatisticsModel:
    	users_service: UsersService = UsersService(self._uow)
    	user_statistics: UserStatisticsModel = await users_service.get_user_statistics_by_user_id(user_id=user_id)
    	return user_statistics
 
	async def get_all_users(self) -> List[UserModel]:
    	users_service: UsersService = UsersService(self._uow)
    	users: List[UserModel] = await users_service.get_all_users()
    	return users

The view implementation uses the service layer from the first article. If you need to speed up database queries, use pure SQL instead of the service layer, abstractions, and ORM.

Views are called from dependencies with virtually no changes (src/users/entrypoints/dependencies.py):

async def get_my_account(token: str = Depends(oauth2_scheme)) -> UserModel:
	jwt_data: JWTDataModel = await parse_jwt_token(token=token)
	users_views: UsersViews = UsersViews(uow=SQLAlchemyUsersUnitOfWork())
	return await users_views.get_user_account(user_id=jwt_data.user_id)

Requests to change data from the user will be represented as separate command objects.

Let's write an abstract command (src/core/interfaces/commands.py). Other commands in the project will inherit from it.

@dataclass(frozen=True)
class AbstractCommand(ABC):
	"""
	Base command, from which any domain command should be inherited.
	Commands represents external operations, which must be executed.
	"""
 
	async def to_dict(
        	self,
        	exclude: Optional[Set[str]] = None,
        	include: Optional[Dict[str, Any]] = None
	) -> Dict[str, Any]:
 
    	"""
    	Create a dictionary representation of the model.
 
    	exclude: set of model fields, which should be excluded from dictionary representation.
    	include: set of model fields, which should be included into dictionary representation.
    	"""
 
    	data: Dict[str, Any] = asdict(self)
    	if exclude:
        	for key in exclude:
            	try:
                	del data[key]
            	except KeyError:
                	pass
 
    	if include:
        	data.update(include)
 
    	return data

During the processing of commands from the user, internal events may occur. Internal events differ from external events in the reason for their occurrence and goals. Let us express them through the event object.

Let's introduce an abstract event (src/core/interfaces/events.py), from which other internal events are inherited:

@dataclass(frozen=True)
class AbstractEvent(ABC):
	"""
	Base event, from which any domain event should be inherited.
	Events represents internal operations, which may be executed.
	"""
 
	async def to_dict(
        	self,
        	exclude: Optional[Set[str]] = None,
        	include: Optional[Dict[str, Any]] = None
	) -> Dict[str, Any]:
 
    	"""
    	Create a dictionary representation of the model.
 
    	exclude: set of model fields, which should be excluded from dictionary representation.
    	include: set of model fields, which should be included into dictionary representation.
    	"""
 
    	data: Dict[str, Any] = asdict(self)
    	if exclude:
        	for key in exclude:
            	try:
                	del data[key]
            	except KeyError:
                	pass
 
    	if include:
        	data.update(include)
 
    	return data

Event-driven architecture

The basis of event-driven architecture is an action that changes the state of the application. An event can be initiated by an external or internal source: it can be a command or an event.

Events can cascade each other. All events must be processed.
Let's consider a situation where a user rates another user. The latter should receive a notification. This is implemented by creating an event.

The functionality of the application can be written line by line. But then it will be difficult to scale the project and change the implementations. Teams and events help to avoid this.

Let's consider the command to vote for a user (src/users/domain/commands.py):


@dataclass(frozen=True)
class VoteForUserCommand(AbstractCommand):
	voted_for_user_id: int
	voting_user_id: int
	liked: bool
	disliked: bool

An event is a DTO (Data Transfer Object) – an object that stores data or state, but has no methods to change it. DTOs are passed between application layers.

To process DTOs, handlers or event handlers are used. Let's introduce abstract classes of event and command handlers (src/core/interfaces/handlers.py):

class AbstractHandler(ABC):
	
	@abstractmethod
	def __init__(self, uow: AbstractUnitOfWork) -> None:
    	raise NotImplementedError
 
 
class AbstractEventHandler(AbstractHandler, ABC):
	"""
	Abstract event handler class, from which every event handler should be inherited from.
	"""
 
	@abstractmethod
	async def __call__(self, event: AbstractEvent) -> None:
    	raise NotImplementedError
 
 
class AbstractCommandHandler(AbstractHandler, ABC):
	"""
	Abstract command handler class, from which every command handler should be inherited from.
	"""
 
	@abstractmethod
	async def __call__(self, command: AbstractCommand) -> Any:
    	raise NotImplementedError
 

When initialized, the handler receives Unit of Work. It is needed for two reasons:

  • to initialize the service that the handler will use;

  • to store events.

Let's add event handling to the base class Unit of Work (src/core/interfaces/units_of_work.py):

class AbstractUnitOfWork(ABC):
	"""
	Interface for any units of work, which would be used for transaction atomicity, according DDD.
	"""
 
	def __init__(self):
    	# Creating events storage for retrieve them in MessageBus:
    	self._events: List[AbstractEvent] = []
 
	...
 
	async def add_event(self, event: AbstractEvent) -> None:
    	self._events.append(event)
 
	def get_events(self) -> Generator[AbstractEvent, None, None]:
    	"""
    	Using generator to get elements only when they needed.
    	Also can not use self._events directly, not to run events endlessly.
    	"""
 
    	while self._events:
        	yield self._events.pop(0)

Let's consider the handler of the command for rating a user (src/users/service_layer/handlers/command_handlers.py):

class VoteForUserCommandHandler(UsersCommandHandler):
 
	async def __call__(self, command: VoteForUserCommand) -> UserStatisticsModel:
    	"""
    	1) Checks, if a vote is appropriate.
    	2) Likes or dislikes user, depends on from command data.
    	3) Creates event, signaling that user has voted.
    	"""
 
    	async with self._uow as uow:
        	if command.voting_user_id == command.voted_for_user_id:
            	raise UserCanNotVoteForHimSelf
 
        	users_service: UsersService = UsersService(uow=uow)
        	if await users_service.check_if_user_already_voted(
                    voting_user_id=command.voting_user_id,
                    voted_for_user_id=command.voted_for_user_id
        	):
            	raise UserAlreadyVotedError
 
        	user_statistics: UserStatisticsModel
        	if command.liked:
            	user_statistics = await users_service.like_user(
                	voting_user_id=command.voting_user_id,
                    voted_for_user_id=command.voted_for_user_id
            	)
        	else:
            	user_statistics = await users_service.dislike_user(
                	voting_user_id=command.voting_user_id,
                    voted_for_user_id=command.voted_for_user_id
            	)
 
        	voted_for_user: UserModel = await users_service.get_user_by_id(id=command.voted_for_user_id)
        	voting_user: UserModel = await users_service.get_user_by_id(id=command.voting_user_id)
        	await uow.add_event(
            	UserVotedEvent(
                	liked=command.liked,
                	disliked=command.disliked,
                    voted_for_user_email=voted_for_user.email,
                    voted_for_user_username=voted_for_user.username,
                    voting_user_username=voting_user.username,
                    voting_user_email=voting_user.email,
            	)
        	)
 
        	return user_statistics
 

Part of the handler logic was transferred from the dependency that was in the first article. After that, we added the creation of a notification event (src/users/domain/events.py):

@dataclass(frozen=True)
class UserVotedEvent(AbstractEvent):
	voted_for_user_email: str
	voted_for_user_username: str
	voting_user_email: str
	voting_user_username: str
	liked: bool
	disliked: bool

Message bus

Creating an event requires processing it. Unit of Work cannot do this, since its task is to maintain the atomicity of transactions. If it were to additionally process events, the single responsibility principle would be violated. SOLID.

The message bus will handle the processing of events (src/core/messagebus.py):

class MessageBus:
 
	def __init__(
    	self,
    	uow: AbstractUnitOfWork,
    	event_handlers: Dict[Type[AbstractEvent], List[AbstractEventHandler]],
    	command_handlers: Dict[Type[AbstractCommand], AbstractCommandHandler],
	) -> None:
 
    	self._uow: AbstractUnitOfWork = uow
    	self._event_handlers: Dict[Type[AbstractEvent], List[AbstractEventHandler]] = event_handlers
    	self._command_handlers: Dict[Type[AbstractCommand], AbstractCommandHandler] = command_handlers
    	self._queue: Queue = Queue()
    	self._command_result: Any = None
 
	async def handle(self, message: Message) -> None:
    	self._queue.put(message)
    	while not self._queue.empty():
        	message = self._queue.get()
        	if isinstance(message, AbstractEvent):
            	await self._handle_event(event=message)
        	elif isinstance(message, AbstractCommand):
            	await self._handle_command(command=message)
        	else:
            	raise MessageBusMessageError
 
	async def _handle_event(self, event: AbstractEvent) -> None:
    	handler: AbstractEventHandler
    	for handler in self._event_handlers[type(event)]:
        	await handler(event)
        	for event in self._uow.get_events():
            	self._queue.put_nowait(event)
 
	async def _handle_command(self, command: AbstractCommand) -> None:
    	handler: AbstractCommandHandler = self._command_handlers[type(command)]
    	self._command_result = await handler(command)
    	for event in self._uow.get_events():
        	self._queue.put_nowait(event)
 
	@property
	def command_result(self) -> Any:
    	return self._command_result

When initialized, the message bus receives mapped event types with handler instances.

An event can have several handlers. Each handler performs one action. For example, when a user is rated, different handlers will send notifications to email and phone. Teams always have one handler.

The main method of the message bus, handle(), receives an object Message – combining external and internal application events (src/core/interfaces/messages.py):

Message = Union[AbstractEvent, AbstractCommand]

Events are queued and processed in the order they occur. If events are not equal, a priority queue can be used.

Dependency Injection

The message bus initialization could be handled by a dependency layer, but then there would be a lot of duplicate code, which violates the DRY (Don't Repeat Yourself) principle.

Let's create a class that will provide a message bus, which means it will initialize it with prepared handlers (src/core/bootstrap.py):

class Bootstrap:
	"""
	Bootstrap class for Dependencies Injection purposes.
	"""
 
	def __init__(
        	self,
        	uow: AbstractUnitOfWork,
        	events_handlers_for_injection: Dict[Type[AbstractEvent], List[Type[AbstractEventHandler]]],
        	commands_handlers_for_injection: Dict[Type[AbstractCommand], Type[AbstractCommandHandler]],
        	dependencies: Optional[Dict[str, Any]] = None
	) -> None:
 
    	self._uow: AbstractUnitOfWork = uow
    	self._dependencies: Dict[str, Any] = {'uow': self._uow}
    	self._events_handlers_for_injection: Dict[Type[AbstractEvent], List[Type[AbstractEventHandler]]] = (
        	events_handlers_for_injection
    	)
    	self._commands_handlers_for_injection: Dict[Type[AbstractCommand], Type[AbstractCommandHandler]] = (
        	commands_handlers_for_injection
    	)
 
    	if dependencies:
            self._dependencies.update(dependencies)
 
	async def get_messagebus(self) -> MessageBus:
    	"""
    	Makes necessary injections to commands handlers and events handlers for creating appropriate messagebus,
    	after which returns messagebus instance.
    	"""
 
    	injected_event_handlers: Dict[Type[AbstractEvent], List[AbstractEventHandler]] = {
        	event_type: [
            	await self._inject_dependencies(handler=handler)
            	for handler in event_handlers
        	]
        	for event_type, event_handlers in self._events_handlers_for_injection.items()
    	}
 
    	injected_command_handlers: Dict[Type[AbstractCommand], AbstractCommandHandler] = {
        	command_type: await self._inject_dependencies(handler=handler)
        	for command_type, handler in self._commands_handlers_for_injection.items()
    	}
 
    	return MessageBus(
        	uow=self._uow,
            event_handlers=injected_event_handlers,
            command_handlers=injected_command_handlers,
    	)
 
	async def _inject_dependencies(
        	self,
        	handler: Union[Type[AbstractEventHandler], Type[AbstractCommandHandler]]
	) -> Union[AbstractEventHandler, AbstractCommandHandler]:
 
    	"""
    	Inspecting handler to know its signature and init params, after which only necessary dependencies will be
    	injected to the handler.
    	"""
 
    	params: MappingProxyType[str, inspect.Parameter] = inspect.signature(handler).parameters
    	handler_dependencies: Dict[str, Any] = {
        	name: dependency
        	for name, dependency in self._dependencies.items()
        	if name in params
    	}
    	return handler(**handler_dependencies)

Dependency injection method _inject_dependencies() using a module inspect checks the signature of each handler. The handler receives only those dependencies that match its signature.

Let's put it all together in dependencies (src/users/entrypoints/dependencies.py):

async def like_user(user_id: int, token: str = Depends(oauth2_scheme)) -> UserStatisticsModel:
	jwt_data: JWTDataModel = await parse_jwt_token(token=token)
	bootstrap: Bootstrap = Bootstrap(
    	uow=SQLAlchemyUsersUnitOfWork(),
        events_handlers_for_injection=EVENTS_HANDLERS_FOR_INJECTION,
        commands_handlers_for_injection=COMMANDS_HANDLERS_FOR_INJECTION
	)
 
	messagebus: MessageBus = await bootstrap.get_messagebus()
	await messagebus.handle(
    	VoteForUserCommand(
        	voted_for_user_id=user_id,
        	voting_user_id=jwt_data.user_id,
        	liked=True,
        	disliked=False
    	)
	)
 
	return messagebus.command_result

When changes occur in the internal layers of the application, you do not need to make changes to the dependencies. If the business logic changes, you can add a new handler for the event or add a new event in the event configuration file (src/users/service_layer/handlers/init.py):

EVENTS_HANDLERS_FOR_INJECTION: Dict[Type[AbstractEvent], List[Type[AbstractEventHandler]]] = {
	UserVotedEvent: [SendVoteNotificationMessageEventHandler],
}
 
COMMANDS_HANDLERS_FOR_INJECTION: Dict[Type[AbstractCommand], Type[AbstractCommandHandler]] = {
	RegisterUserCommand: RegisterUserCommandHandler,
	VoteForUserCommand: VoteForUserCommandHandler,
	VerifyUserCredentialsCommand: VerifyUserCredentialsCommandHandler,
}

Conclusions

Implementation of event-oriented architecture increases the flexibility and scalability of the application. New functionality does not require major changes in the code, it is enough to create a new event and handlers for it.

The downside of implementing event-driven architecture is that it increases the complexity of the application. This makes it more difficult for new people to get involved in the project. Therefore, the use of event-driven architecture should be justified by the complexity of the application, its growth potential, and its tendency to change.

You can study the patterns and idioms described in the article in more detail in the book by Percival G., Gregory B. – “Python Development Patterns: TDD, DDD, and Event-Driven Architecture”.

See you in the next episodes.
I would appreciate your support and constructive criticism!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *