We write a useful service in Python to get a TIN

In this article I want to talk about how to write a useful service for obtaining a TIN using personal data (passport data). TIN of an individual is obtained using the site https://service.nalog.ru/. Similar functionality, most likely, has already been implemented somewhere and by someone. The main idea of ​​the article is to share the experience of working with Python in terms of creating a complete project using a dependency container, creating listeners for RabbitMQ and working with the MongoDB database. Working with clients of the service is implemented through RabbitMQ in the mode of continuous reading of the queue, sending the result to the output queue. The service will live in Kubernetes, which requires liveness and readiness trials. For this, a web server is used.

Photo by Christina Morillo from Pexels

Photo by Christina Morillo from Pexels

General information

We will implement the service in Python 3.10 using libraries aio-pika, fastapi, pydantic, motor and other libraries that will be specified in the pyproject.toml of the project. We use MongoDB 4+ as a database. The tax service is accessed using the aiohttp library. The project is posted in the public domain on GitHub.

The application functions as an input queue listener and a web server for returning liveness and readiness probes. When a message is received in a queue, the name of the output queue to which the response will be sent is subtracted from the reply-to header. The processing of the request is passed to the service, which checks for a similar request in the database. In the absence of data on the client, a request is made to an external service. An external service can process a certain number of messages without requiring a captcha. After exceeding the limits, which are not known for certain (but change with a general increased load), the message is placed in a dead queue and returned to processing after the time specified in the settings.

Preparatory work for the database is not required. The first time you connect to MongoDB, the necessary collections and indexes will be created.

Service communication contract

Let’s define the input message contract in JSON format:

hidden text
{
	 "requestId": str,
	 "firstName": str,
	 "lastName": str,
	 "middleName": str,
	 "birthDate": date,
	 "documentSerial": str,
	 "documentNumber": str,
	 "documentDate": date
}

All fields are intuitive. Attribute requestId must be unique within all messages, it makes sense to pass it as a string representation GUID.

The name of the output queue may be passed through the reply-to field of the message header.

The output message contract will be as follows:

hidden text
{
	"requestId": str,
	"inn": str,
	"cached": bool,
	"details": str,
	"elapsedTime": float
}

In the response, we will give the request code, the TIN itself and the time during which the service completed the request and the sign of the cached response.

Project Structure

The general structure of the project directories is as follows.

src
  |--inn_service
    |--clients
    |--connection_managers
    |--core
    |--infrastructure
       |--controllers
       |--handlers
       |--http_server
       |--queue_manager                     
    |--models    
    |--repositories
    |--serializers
    |--services    
  main.py
.env.example
.gitignore
docker-compose.yaml
pyproject.yaml

The root directory will contain project launch tools: docker-compose, linting and test launch makefile. The project itself is located in src/inn_service and contains:

  • clients – clients to connect to the actual data providers (nalog.ru and others);

  • connection_managers – infrastructure connections to the database, queues;

  • core – the general code of the application (the application itself, the container);

  • infrastructure – queue handler manager, handlers themselves, infrastructure controllers;

  • models – application models, DTO objects;

  • repositories – a repository for working with the database;

  • serializers – serializers of input requests, data to be sent to the TIN provider;

  • services – application services.

We will shift the work of creating a virtual connection to PyCharm and poetry. Brief installation command: poetry install.

Application settings

Let’s start development by creating application settings using BaseSettings from the pydantic package.

In file settings.py there will be settings.

hidden text
class Settings(BaseSettings):  
    app_name: str="INN service"  
    app_request_retry_times: int  # Количество попыток обработки внешнего запроса  
    app_request_retry_sec: int  # Время задержки в секундах перед повторной обработкой запроса  
  
    http_host: str  
    http_port: int  
    http_handler: str="asyncio"  
  
    mongo_host: str  
    mongo_port: str  
    mongo_user: str  
    mongo_pass: str  
    mongo_name: str  
    mongo_rs: Optional[str] = None  
    mongo_auth: str  
    mongo_timeout_server_select: int = 5000  
  
    rabbitmq_host: str  
    rabbitmq_port: int  
    rabbitmq_user: str  
    rabbitmq_pass: str  
    rabbitmq_vhost: str  
    rabbitmq_exchange_type: str  
    rabbitmq_prefetch_count: int  
    rabbitmq_source_queue_name: str  
  
    client_nalog_url: str  # Адрес внешнего сервиса для получения ИНН  
    client_nalog_timeout_sec: int  # Таймаут ожидания ответа от сервиса  
    client_nalog_retries: int  # Количество попыток запросов к внешнему сервису  
    client_nalog_wait_sec: int  # Время ожидания между попытками client_nalog_retries  
  
    @property  
    def mongo_dsn(self) -> str:  
        mongo_dsn = 'mongodb://{}:{}@{}:{}/{}'.format(  
            self.mongo_user,  
            self.mongo_pass,  
            self.mongo_host,  
            self.mongo_port,  
            self.mongo_auth  
        )  
  
        if self.mongo_rs:  
            mongo_dsn += f'?replicaSet={self.mongo_rs}'  
  
        return mongo_dsn  
  
    @property  
    def rabbitmq_dsn(self) -> str:  
        return 'amqp://{}:{}@{}:{}/{}'.format(  
            self.rabbitmq_user,  
            self.rabbitmq_pass,  
            self.rabbitmq_host,  
            self.rabbitmq_port,  
            self.rabbitmq_vhost  
        )

I suggest not to specify default values ​​for the settings. If something goes wrong, we will immediately see the problem. At this point, you can immediately prepare the file .env.exampleA containing the default settings for the service.

Infrastructure connections

Let’s create a connection layer to the rabbitmq, mongodb infrastructure through the aio-pika and motor components:

poetry add motor aio-pika fast fastapi uvicorn injector

The connection layer will be hosted in connection_managers and is designed to organize a connection to the database and the queue manager. Let’s add two mixins to create a mechanism for registering autostart and terminating the application. The function autostart mechanism is used at application startup to initialize the connection to RabbitMQ and MongoDB, as well as to create indexes in the database collection. In case of connection errors, the application does not start and an error is displayed in the logs.

hidden text
class StartupEventMixin(ABC):  
  
    @abstractmethod  
    def startup(self) -> Coroutine:  
        raise NotImplementedError  
  
  
class ShutdownEventMixin(ABC):  
  
    @abstractmethod  
    def shutdown(self) -> Coroutine:  
        raise NotImplementedError

Using the example of RabbitConnectionManager, we will demonstrate the implementation.

hidden text
class RabbitConnectionManager(StartupEventMixin, ShutdownEventMixin, EventLiveProbeMixin):  
    def startup(self) -> Coroutine:  
        return self.create_connection()

	def shutdown(self) -> Coroutine:  
        return self.close_connection()  
  
	async def create_connection(self) -> None:  
	    self.logger.info('Create connection RabbitMQ')  
	    try:  
	        self._connection = await connect_robust(self._dsn)  
	        self._connection.reconnect_callbacks.add(self.on_connection_restore)  
	        self._connection.close_callbacks.add(self.on_close_connection)  
	        self.connected = True  
	    except ConnectionError as exc:  
	        err_message = f'Rabbit connection problem: {exc}'  
	        self.logger.error(err_message)  
	        raise ConnectionError(err_message)  
	  
	async def close_connection(self) -> None:  
	    if self._connection:  
	        await self._connection.close()
	
	# ... некоторый код пропущен, полная версия на гитхабе
	
	def on_close_connection(self, *args):  
	    self.logger.error('Lost connection to RabbitMQ...')  
	    self.connected = False  
	  
	def on_connection_restore(self, *args):  
	    self.logger.info('Connection to RabbitMQ has been restored...')  
	    self._channel = None  
	    self._exchange = None  
	    self.connected = True

When connecting to RabbitMQ, callback functions are set to respond to connection loss and restore it.

Handler manager

The handler manager is designed to manage listeners (consumers) of queues. The project uses the concept of “dead queues”, which allows you to postpone the message for some time and return to processing it later. The reason for this may be a long response from the provider, temporary errors of the provider, the requirement to enter captcha due to load. The mechanism of dead queues is technically analyzed in sufficient detail in the article Delayed Retrays by RabbitMQ. Each queue handler must store and return an indication of the use of retrays, the time between returns to the main queue for processing, and the name of the queue it plans to listen to. The main handler code is in run_handler. The function is expected True upon successful processing or an irreparable request error (incorrect message body) and Falseif the request could not be processed but should be retried later.

Base handler code:

hidden text
class BaseHandler(ABC):  
    def __init__(  
            self,  
            settings: Settings,  
            logger: AppLogger,  
            rabbitmq_connection: RabbitConnectionManager  
    ) -> None:  
        self.settings = settings  
        self.logger = logger  
        self.rabbitmq_connection = rabbitmq_connection  
  
    @abstractmethod  
    def get_use_retry(self) -> bool:  
        raise NotImplementedError  
  
    def get_retry_ttl(self) -> int:  
        return 0  
  
    @abstractmethod  
    def get_source_queue(self) -> str:  
        raise NotImplementedError  
  
    def convert_seconds_to_mseconds(self, value: int) -> int:  
        return value * 1000  
  
    @abstractmethod  
    async def run_handler(  
            self,  
            message: dict,  
            request_id: Optional[str],  
            result_queue: Optional[str],  
            count_retry: Optional[int] = 0  
    ) -> bool:  
        raise NotImplementedError

Actually the only inheritor of the class RequestHandlerwhich implements the reception and processing of the message:

hidden text
class RequestHandler(BaseHandler):  
    def __init__(  
            self,  
            settings: Settings,  
            logger: AppLogger,  
            rabbitmq_connection: RabbitConnectionManager,  
            service: InnService  
    ) -> None:  
        super().__init__(settings, logger, rabbitmq_connection)  
        self.source_queue_name = self.settings.rabbitmq_source_queue_name  
        self.retry_times = self.settings.app_request_retry_times  
        self.retry_sec = self.settings.app_request_retry_sec  
        self.service = service  
  
    def get_source_queue(self) -> str:  
        return self.source_queue_name  
  
    def get_use_retry(self) -> bool:  
        return True  
  
    def get_retry_ttl(self) -> int:  
        return self.retry_sec  
  
    async def run_handler(  
            self,  
            message: dict,  
            request_id: Optional[str],  
            result_queue: Optional[str],  
            count_retry: Optional[int] = 0  
    ) -> bool:  
        if count_retry > self.retry_times:  
            self.logger.warning(f'Request {request_id} was rejected by excess attempts {self.retry_times} times')  
            return True  
  
        self.logger.info(f'Get request {request_id} for response {result_queue}')  
  
        client_data = RequestSerializer.parse_obj(message)  
  
        response = await self.service.get_client_inn(client_data)  
  
        if result_queue:  
            json_message = response.dict()  
            await self.rabbitmq_connection.send_data_by_queue(json_message, result_queue)  
  
        return True

Upon receipt of a message, we check the number of repeated hits in the queue through the parameter count_retry. In case of exceeding, we send the message processing status (error) to the output queue and suspend the processing of this message. RequestSerializer.parse_obj(message) not wrapped in a try…except block because the queue manager handles ValidationError message conversion errors.

Working with the database

The choice fell on MongoDB because of ease of use, lack of migrations, flexible data processing scheme. The task does not require the storage of dependent data, the design of relationships between tables. To work with data, we will use the Repository pattern.

The base repository contains functions for working with data, indexes in Mongo notation, and in specific classes we implement the functions necessary for the service. Indexes are created when the application starts in the background (background flag), for which the StartupEventMixin mixin implementation is used. Dataset queries support pagination and sorting.

A concrete class is created for each separate collection. The project has one repository for client requests. The data storage model is located in the models directory and is called ClientDataModel. The client model was created with typing supported by MongoDB (datetime instead of date), the created_at attribute is set to generate a default value via default_factory. Also added to the model is a function for calculating the processing time of a request. elapsed_time and a class method for creating an object from a client request.

hidden text
class ClientDataModel(BaseModel):  
    created_at: datetime = Field(default_factory=datetime.utcnow)  
    request_id: str  
    first_name: str  
    last_name: str  
    middle_name: str  
    birth_date: datetime  
    birth_place: str = Field(default="")  
    passport_num: str  
    document_date: datetime  
    executed_at: Optional[datetime]  
    inn: Optional[str]  
    error: Optional[str]  
  
    @classmethod  
    def create_from_request(cls, request: RequestMqSerializer) -> 'ClientDataModel':  
        return ClientDataModel(  
            request_id=request.request_id,  
            first_name=request.first_name,  
            last_name=request.last_name,  
            middle_name=request.middle_name,  
            birth_date=datetime.combine(request.birth_date, datetime.min.time()),  
            passport_num='{} {}'.format(request.document_serial, request.document_number),  
            document_date=datetime.combine(request.document_date, datetime.min.time()),  
        )  
  
    @property  
    def elapsed_time(self) -> float:  
        end = self.executed_at or datetime.utcnow()  
        return (end - self.created_at).total_seconds()

Base repository code:

hidden text
class BaseRepository(StartupEventMixin):  
  
    def __init__(self, mongodb_connection_manager: MongoConnectionManager, setting: Settings) -> None:  
        self.mongodb_connection_manager = mongodb_connection_manager  
        self.db_name = setting.mongo_name  
  
    @property  
    def collection_name(self) -> str:  
        raise NotImplementedError  
  
    @property  
    def collection_indexes(self) -> Iterable[IndexDef]:  
        raise NotImplementedError  
  
    def startup(self) -> Coroutine:  
        return self.create_indexes()  
  
    async def create_index(self, field_name: str, sort_id: int) -> None:  
        connection = await self.mongodb_connection_manager.get_connection()  
        collection = connection[self.db_name][self.collection_name]  
        await collection.create_index([(field_name, sort_id), ], background=True)  
  
    async def create_indexes(self) -> None:  
        tasks = []  
        for index_item in self.collection_indexes:  
            tasks.append(self.create_index(index_item.name, index_item.sort))  
        asyncio.ensure_future(asyncio.gather(*tasks))  
  
    async def get_one_document(self, criteria: dict) -> Optional[dict]:  
        connection = await self.mongodb_connection_manager.get_connection()  
        collection = connection[self.db_name][self.collection_name]  
        return await collection.find_one(criteria)  
  
    async def get_list_document(  
            self,  
            criteria: dict,  
            sort_criteria: Optional[list] = None,  
            limit: Optional[int] = 0,  
            skip: Optional[int] = 0,  
    ) -> List[dict]:  
        if not sort_criteria:  
            sort_criteria = []  
        connection = await self.mongodb_connection_manager.get_connection()  
        cursor = connection[self.db_name][self.collection_name].find(  
            criteria,  
            limit=limit,  
            skip=skip,  
            sort=sort_criteria  
        )  
  
        result = list()  
        async for data in cursor:  
            result.append(data)  
        return result  
  
    async def save_document(self, data: dict) -> str:  
        connection = await self.mongodb_connection_manager.get_connection()  
        result = await connection[self.db_name][self.collection_name].insert_one(data)  
        return result.inserted_id  
  
    async def update_document(self, criteria: dict, data: dict) -> None:  
        connection = await self.mongodb_connection_manager.get_connection()  
        await connection[self.db_name][self.collection_name].update_one(criteria, {'$set': data})

service layer

The service layer does all the necessary processing on the data.

  • accessing the database to search for a similar request (request_id and passport data);

  • return the result if the data was found;

  • make an API request;

  • save the query result to the database;

  • return a response.

In the service layer, I tried to abstract from working with the infrastructure. The response is returned to the calling function, which must know where to return the response. In this case, the queue manager “knows” where to reply due to the presence of the reply-to field in the request header. The return value is formatted as a DTO object (RequestDTO).

Class code InnService:

hidden text
class InnService:  
    def __init__(  
            self,  
            settings: Settings,  
            logger: AppLogger,  
            client: NalogApiClient,  
            storage: RequestRepository  
    ) -> None:  
        self.settings = settings  
        self.logger = logger  
        self.client = client  
        self.storage_repository = storage  
  
    async def get_client_inn_from_storage(self, client_data: RequestSerializer) -> Optional[RequestModel]:  
        client_passport = f'{client_data.document_serial} {client_data.document_number}'  
        client_request = await self.storage_repository.find_request(client_passport, client_data.request_id)  
        return client_request  
  
    def update_status(self, model: RequestModel, inn: str, error: str) -> None:  
        model.inn = inn  
        model.error = error  
  
    async def get_client_inn(self, client_data: RequestSerializer) -> RequestDTO:  
        """Получение клиентского ИНН"""  
        start_process = datetime.utcnow()  
        model = RequestModel.create_from_request(client_data)  
  
        # Получить данные из БД  
        existing_data = await self.get_client_inn_from_storage(client_data)  
        if existing_data:  
            elapsed_time = (datetime.utcnow() - start_process).total_seconds()  
            return RequestDTO(  
                request_id=client_data.request_id,  
                inn=existing_data.inn,  
                elapsed_time=elapsed_time,  
                cashed=True  
            )  
  
        # Сделать фактический запрос в Nalog API  
        request = NalogApiRequestSerializer.create_from_request(client_data)  
        error, result = None, ''  
        try:  
            result = await self.client.send_request_for_inn(request)  
        except NalogApiClientException as exception:  
            self.logger.error('Error request to Nalog api service', details=str(exception))  
            error = str(exception)  
  
        self.update_status(model, result, error)  
        await self.storage_repository.save_request(model)  
  
        return RequestDTO(  
            request_id=model.request_id,  
            inn=model.inn,  
            details=model.error,  
            elapsed_time=model.elapsed_time  
        )

The second service in the application is the infrastructure polling service for health-check. Infrastructure managers that need to be monitored should inherit from the mixin EventLiveProbeMixin and implement the function is_connected.

Client

The NalogApiClient client is designed to perform a POST request to https://service.nalog.ru/inn.do and parsing the response status. The function of directly formatting the request is wrapped in a retry request repeater decorator when errors occur. Repeater settings in the general settings of the application.

hidden text
class NalogApiClient:  
    CLIENT_EXCEPTIONS = (  
        NalogApiClientException,  
        aiohttp.ClientProxyConnectionError,  
        aiohttp.ServerTimeoutError,  
    )  
  
    def __init__(self, settings: Settings, logger: AppLogger):  
        self.nalog_api_service_url = settings.client_nalog_url  
        self.request_timeout = settings.client_nalog_timeout_sec  
        self.retries_times = settings.client_nalog_retries  
        self.retries_wait = settings.client_nalog_wait_sec  
        self.logger = logger  
        self.timeout = aiohttp.ClientTimeout(total=self.request_timeout)  
  
    @property  
    def _headers(self):  
        return {  
            "Accept": "application/json, text/javascript, */*; q=0.01",  
            "Accept-Language": "ru-RU,ru",  
            "Connection": "keep-alive",  
            "Origin": "https://service.nalog.ru",  
            "Referer": self.nalog_api_service_url,  
            "Sec-Fetch-Dest": "empty",  
            "Sec-Fetch-Mode": "cors",  
            "Sec-Fetch-Site": "same-origin",  
            "Sec-GPC": "1",  
            "X-Requested-With": "XMLHttpRequest",  
        }  
  
    async def send_request_for_inn(self, nalog_api_request: NalogApiRequestSerializer) -> Optional[str]:  
        self.logger.debug(f'Request to nalog api service for {nalog_api_request.client_fullname}')  
  
        form_data = nalog_api_request.dict(by_alias=True)  
  
        @retry(self.CLIENT_EXCEPTIONS, logger=self.logger, attempts=self.retries_times, wait_sec=self.retries_wait)  
        async def make_request(client_session: aiohttp.ClientSession):  
            async with client_session.post(url=self.nalog_api_service_url, data=form_data) as response:  
                if response.status not in [http.HTTPStatus.OK, http.HTTPStatus.NOT_FOUND]:  
                    response_text = await response.text()  
                    raise NalogApiClientException(response_text)  
                data = await response.json()  
                code = data.get('code')  
                captcha_required = data.get('captchaRequired')  
                if captcha_required:  
                    raise NalogApiClientException(f'Captcha required for request {nalog_api_request.client_fullname}')  
                if code == 0:  
                    return 'no inn'  
                elif code == 1:  
                    return data.get('inn')  
                else:  
                    raise NalogApiClientException(f'Unable to parse response! Details: {response}')  
  
        async with aiohttp.ClientSession(timeout=self.timeout, headers=self._headers) as session:  
            return await make_request(session)

Container

Container is designed to build the necessary dependencies and pass them to the application. Our container is assembled in the ApplicationContainer class. All dependencies are forwarded as @singletons and registered as dependency providers of @provider types provided by the library injector. When writing tests, you need to prepare another container with actual fake or stub objects.

The main interest in working with the container is concentrated in the class ContainerManagerwhich is used to test the implementation of the mixin EventSubscriberMixin And EventLiveProbeMixin. Function get_event_collection generates lists of callback functions for starting and exiting the application. Actually passing through the lists and calling callback functions is implemented in the following functions: run_startup And run_shutdown.

hidden text
class ContainerManager:  
  
    def __init__(self, cls_container: Type[Container]) -> None:  
        self._container = Injector(cls_container())  
        self._bindings = self._container.binder._bindings  
  
    def get_container(self) -> Injector:  
        return self._container  
  
    def get_live_probe_handlers(self) -> List[Type[Callable]]:  
        result = []  
        binding_collection = [binding for binding in self._bindings]  
        for binding in binding_collection:  
            if issubclass(binding, EventLiveProbeMixin):  
                binding_obj = self._container.get(binding)  
                result.append(binding_obj.is_connected)  
        return result  
  
    def get_startup_handlers(self):  
        handlers = []  
        binding_collection = [binding for binding in self._bindings]  
        for binding in binding_collection:  
            if issubclass(binding, StartupEventMixin):  
                binding_obj = self._container.get(binding)  
                handlers.append(binding_obj.startup())  
        return handlers  
  
    def get_shutdown_handlers(self):  
        handlers = []  
        binding_collection = [binding for binding in self._bindings]  
        for binding in binding_collection:  
            if issubclass(binding, ShutdownEventMixin):  
                binding_obj = self._container.get(binding)  
                handlers.append(binding_obj.shutdown())  
        return handlers  
  
    async def run_startup(self) -> None:  
        exception = None  
        for handler in self.get_startup_handlers():  
            if exception:  
                handler.close()  
            else:  
                try:  
                    await handler  
                except Exception as exc:  
                    exception = exc  
  
        if exception is not None:  
            raise exception  
  
    async def run_shutdown(self) -> None:  
        handlers = []  
        for handler in self.get_shutdown_handlers():  
            handlers.append(handler)  
        await asyncio.gather(*handlers)

Actually the container itself, in which the necessary instances of classes are initialized. When writing tests, a similar container will be created.

hidden text
class ApplicationContainer(Container):  
  
    @singleton  
    @provider    
    def provide_settings(self) -> Settings:  
        return Settings()  
  
	# ... немного кода пропущено

    @singleton  
    @provider    
    def provide_mongodb_connection(self, settings: Settings, logger: AppLogger) -> MongoConnectionManager:  
        return MongoConnectionManager(settings, logger)  
  
    @singleton  
    @provider    
    def provide_rabbitmq_connection(self, settings: Settings, logger: AppLogger) -> RabbitConnectionManager:  
        return RabbitConnectionManager(settings, logger)  
  
    @singleton  
    @provider    
    def provide_nalog_api_client(self, settings: Settings, logger: AppLogger) -> NalogApiClient:  
        return NalogApiClient(settings, logger)  
  
    @singleton  
    @provider    
    def provide_request_repository(self, settings: Settings, mongo_connection: MongoConnectionManager) -> RequestRepository:  
        return RequestRepository(mongo_connection, settings)

Application

The main task of the application is to put everything together and start a common thread of execution. The application assembly code is extremely simple, the container manager performs the initialization of classes. The assembly of the application is carried out in the following steps:

  • receiving a container, passing it to the container manager;

  • event_loop initialization;

  • adding handlers for queues;

  • launching initializers for the infrastructure layer (implementing startup mixins);

  • launching the FastAPI web server to return health-check;

  • enable global error handler.

hidden text
class Application:  
  
    def __init__(self, cls_container: Type[Container]) -> None:  
        self.loop = asyncio.get_event_loop()  
        self.container_manager = ContainerManager(cls_container)  
        self.container = self.container_manager.get_container()  
        self.settings = self.container.get(Settings)  
        self.logger = self.container.get(AppLogger)  
        self.live_probe_service = self.container.get(LiveProbeService)  
        self.queue_manager = self.container.get(QueueManager)  
        self.app_name = self.settings.app_name  
        self.http_server = None  
  
    def init_application(self):  
        self.http_server = ServerAPIManager(self.container)  
  
        request_handler = self.container.get(RequestHandler)  
        self.queue_manager.add_handler(request_handler)  
  
        live_probe_handlers = self.container_manager.get_live_probe_handlers()  
        for handler in live_probe_handlers:  
            self.live_probe_service.add_component(handler)  
  
    def run(self) -> None:  
        self.logger.info(f'Starting application {self.app_name}')  
  
        self.init_application()  
  
        try:  
            self.loop.run_until_complete(self.container_manager.run_startup())  
  
            tasks = asyncio.gather(  
                self.http_server.serve(),  
                self.queue_manager.run_handlers_async(),  
            )  
            self.loop.run_until_complete(tasks)  
  
            self.loop.run_forever()  
        except BaseException as exception:  
            exit(1)  
        finally:  
            self.loop.run_until_complete(self.container_manager.run_shutdown())  
  
            self.loop.close()  
            self.logger.info('Application disabled')

The application starts from the main script using a small typer library. The little library has the ability to conveniently handle command line options.

hidden text
import typer  
from core.application import Application  
from app_container import ApplicationContainer  
  
  
def main():  
    try:  
        application = Application(ApplicationContainer)  
        application.run()  
    except BaseException as exc:  
        typer.echo(f'Error starting application. Details: {str(exc)}')  
  
  
if __name__ == "__main__":  
    typer.run(main)

How to run it all?

The project contains a docker-compose file to build. You need to copy the file .env.example to file .env .

docker compose build
docker compose up

After executing these commands, a mongodb instance will be launched on port 27017 and rabbitmq on port 5672 with an admin panel on 15672. The RabbitMQ admin panel can be accessed at http://localhost:15672. In the queues section, you need to create a new queue to which the results of the service will be sent and bind it to the default exchange (direct).

To be continued

The article deals with the topic of developing an application in Python using queues, a dependency container, and health-check support. I propose to discuss the architecture in the comments, and then continue to develop the service. In the next iterations, I plan to add a hypothetical non-free client, which we will use after a certain number of requests in free service. And finally write tests.

Materials that may be useful for understanding the material:

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *