We write a useful service in Python to get a TIN
In this article I want to talk about how to write a useful service for obtaining a TIN using personal data (passport data). TIN of an individual is obtained using the site https://service.nalog.ru/. Similar functionality, most likely, has already been implemented somewhere and by someone. The main idea of the article is to share the experience of working with Python in terms of creating a complete project using a dependency container, creating listeners for RabbitMQ and working with the MongoDB database. Working with clients of the service is implemented through RabbitMQ in the mode of continuous reading of the queue, sending the result to the output queue. The service will live in Kubernetes, which requires liveness and readiness trials. For this, a web server is used.
General information
We will implement the service in Python 3.10 using libraries aio-pika, fastapi, pydantic, motor and other libraries that will be specified in the pyproject.toml of the project. We use MongoDB 4+ as a database. The tax service is accessed using the aiohttp library. The project is posted in the public domain on GitHub.
The application functions as an input queue listener and a web server for returning liveness and readiness probes. When a message is received in a queue, the name of the output queue to which the response will be sent is subtracted from the reply-to header. The processing of the request is passed to the service, which checks for a similar request in the database. In the absence of data on the client, a request is made to an external service. An external service can process a certain number of messages without requiring a captcha. After exceeding the limits, which are not known for certain (but change with a general increased load), the message is placed in a dead queue and returned to processing after the time specified in the settings.
Preparatory work for the database is not required. The first time you connect to MongoDB, the necessary collections and indexes will be created.
Service communication contract
Let’s define the input message contract in JSON format:
hidden text
{
"requestId": str,
"firstName": str,
"lastName": str,
"middleName": str,
"birthDate": date,
"documentSerial": str,
"documentNumber": str,
"documentDate": date
}
All fields are intuitive. Attribute requestId
must be unique within all messages, it makes sense to pass it as a string representation GUID
.
The name of the output queue may be passed through the reply-to field of the message header.
The output message contract will be as follows:
hidden text
{
"requestId": str,
"inn": str,
"cached": bool,
"details": str,
"elapsedTime": float
}
In the response, we will give the request code, the TIN itself and the time during which the service completed the request and the sign of the cached response.
Project Structure
The general structure of the project directories is as follows.
src
|--inn_service
|--clients
|--connection_managers
|--core
|--infrastructure
|--controllers
|--handlers
|--http_server
|--queue_manager
|--models
|--repositories
|--serializers
|--services
main.py
.env.example
.gitignore
docker-compose.yaml
pyproject.yaml
The root directory will contain project launch tools: docker-compose, linting and test launch makefile. The project itself is located in src/inn_service and contains:
clients – clients to connect to the actual data providers (nalog.ru and others);
connection_managers – infrastructure connections to the database, queues;
core – the general code of the application (the application itself, the container);
infrastructure – queue handler manager, handlers themselves, infrastructure controllers;
models – application models, DTO objects;
repositories – a repository for working with the database;
serializers – serializers of input requests, data to be sent to the TIN provider;
services – application services.
We will shift the work of creating a virtual connection to PyCharm and poetry. Brief installation command: poetry install
.
Application settings
Let’s start development by creating application settings using BaseSettings from the pydantic package.
In file settings.py there will be settings.
hidden text
class Settings(BaseSettings):
app_name: str="INN service"
app_request_retry_times: int # Количество попыток обработки внешнего запроса
app_request_retry_sec: int # Время задержки в секундах перед повторной обработкой запроса
http_host: str
http_port: int
http_handler: str="asyncio"
mongo_host: str
mongo_port: str
mongo_user: str
mongo_pass: str
mongo_name: str
mongo_rs: Optional[str] = None
mongo_auth: str
mongo_timeout_server_select: int = 5000
rabbitmq_host: str
rabbitmq_port: int
rabbitmq_user: str
rabbitmq_pass: str
rabbitmq_vhost: str
rabbitmq_exchange_type: str
rabbitmq_prefetch_count: int
rabbitmq_source_queue_name: str
client_nalog_url: str # Адрес внешнего сервиса для получения ИНН
client_nalog_timeout_sec: int # Таймаут ожидания ответа от сервиса
client_nalog_retries: int # Количество попыток запросов к внешнему сервису
client_nalog_wait_sec: int # Время ожидания между попытками client_nalog_retries
@property
def mongo_dsn(self) -> str:
mongo_dsn = 'mongodb://{}:{}@{}:{}/{}'.format(
self.mongo_user,
self.mongo_pass,
self.mongo_host,
self.mongo_port,
self.mongo_auth
)
if self.mongo_rs:
mongo_dsn += f'?replicaSet={self.mongo_rs}'
return mongo_dsn
@property
def rabbitmq_dsn(self) -> str:
return 'amqp://{}:{}@{}:{}/{}'.format(
self.rabbitmq_user,
self.rabbitmq_pass,
self.rabbitmq_host,
self.rabbitmq_port,
self.rabbitmq_vhost
)
I suggest not to specify default values for the settings. If something goes wrong, we will immediately see the problem. At this point, you can immediately prepare the file .env.exampleA containing the default settings for the service.
Infrastructure connections
Let’s create a connection layer to the rabbitmq, mongodb infrastructure through the aio-pika and motor components:
poetry add motor aio-pika fast fastapi uvicorn injector
The connection layer will be hosted in connection_managers and is designed to organize a connection to the database and the queue manager. Let’s add two mixins to create a mechanism for registering autostart and terminating the application. The function autostart mechanism is used at application startup to initialize the connection to RabbitMQ and MongoDB, as well as to create indexes in the database collection. In case of connection errors, the application does not start and an error is displayed in the logs.
hidden text
class StartupEventMixin(ABC):
@abstractmethod
def startup(self) -> Coroutine:
raise NotImplementedError
class ShutdownEventMixin(ABC):
@abstractmethod
def shutdown(self) -> Coroutine:
raise NotImplementedError
Using the example of RabbitConnectionManager, we will demonstrate the implementation.
hidden text
class RabbitConnectionManager(StartupEventMixin, ShutdownEventMixin, EventLiveProbeMixin):
def startup(self) -> Coroutine:
return self.create_connection()
def shutdown(self) -> Coroutine:
return self.close_connection()
async def create_connection(self) -> None:
self.logger.info('Create connection RabbitMQ')
try:
self._connection = await connect_robust(self._dsn)
self._connection.reconnect_callbacks.add(self.on_connection_restore)
self._connection.close_callbacks.add(self.on_close_connection)
self.connected = True
except ConnectionError as exc:
err_message = f'Rabbit connection problem: {exc}'
self.logger.error(err_message)
raise ConnectionError(err_message)
async def close_connection(self) -> None:
if self._connection:
await self._connection.close()
# ... некоторый код пропущен, полная версия на гитхабе
def on_close_connection(self, *args):
self.logger.error('Lost connection to RabbitMQ...')
self.connected = False
def on_connection_restore(self, *args):
self.logger.info('Connection to RabbitMQ has been restored...')
self._channel = None
self._exchange = None
self.connected = True
When connecting to RabbitMQ, callback functions are set to respond to connection loss and restore it.
Handler manager
The handler manager is designed to manage listeners (consumers) of queues. The project uses the concept of “dead queues”, which allows you to postpone the message for some time and return to processing it later. The reason for this may be a long response from the provider, temporary errors of the provider, the requirement to enter captcha due to load. The mechanism of dead queues is technically analyzed in sufficient detail in the article Delayed Retrays by RabbitMQ. Each queue handler must store and return an indication of the use of retrays, the time between returns to the main queue for processing, and the name of the queue it plans to listen to. The main handler code is in run_handler
. The function is expected True
upon successful processing or an irreparable request error (incorrect message body) and False
if the request could not be processed but should be retried later.
Base handler code:
hidden text
class BaseHandler(ABC):
def __init__(
self,
settings: Settings,
logger: AppLogger,
rabbitmq_connection: RabbitConnectionManager
) -> None:
self.settings = settings
self.logger = logger
self.rabbitmq_connection = rabbitmq_connection
@abstractmethod
def get_use_retry(self) -> bool:
raise NotImplementedError
def get_retry_ttl(self) -> int:
return 0
@abstractmethod
def get_source_queue(self) -> str:
raise NotImplementedError
def convert_seconds_to_mseconds(self, value: int) -> int:
return value * 1000
@abstractmethod
async def run_handler(
self,
message: dict,
request_id: Optional[str],
result_queue: Optional[str],
count_retry: Optional[int] = 0
) -> bool:
raise NotImplementedError
Actually the only inheritor of the class RequestHandler
which implements the reception and processing of the message:
hidden text
class RequestHandler(BaseHandler):
def __init__(
self,
settings: Settings,
logger: AppLogger,
rabbitmq_connection: RabbitConnectionManager,
service: InnService
) -> None:
super().__init__(settings, logger, rabbitmq_connection)
self.source_queue_name = self.settings.rabbitmq_source_queue_name
self.retry_times = self.settings.app_request_retry_times
self.retry_sec = self.settings.app_request_retry_sec
self.service = service
def get_source_queue(self) -> str:
return self.source_queue_name
def get_use_retry(self) -> bool:
return True
def get_retry_ttl(self) -> int:
return self.retry_sec
async def run_handler(
self,
message: dict,
request_id: Optional[str],
result_queue: Optional[str],
count_retry: Optional[int] = 0
) -> bool:
if count_retry > self.retry_times:
self.logger.warning(f'Request {request_id} was rejected by excess attempts {self.retry_times} times')
return True
self.logger.info(f'Get request {request_id} for response {result_queue}')
client_data = RequestSerializer.parse_obj(message)
response = await self.service.get_client_inn(client_data)
if result_queue:
json_message = response.dict()
await self.rabbitmq_connection.send_data_by_queue(json_message, result_queue)
return True
Upon receipt of a message, we check the number of repeated hits in the queue through the parameter count_retry
. In case of exceeding, we send the message processing status (error) to the output queue and suspend the processing of this message. RequestSerializer.parse_obj(message)
not wrapped in a try…except block because the queue manager handles ValidationError message conversion errors.
Working with the database
The choice fell on MongoDB because of ease of use, lack of migrations, flexible data processing scheme. The task does not require the storage of dependent data, the design of relationships between tables. To work with data, we will use the Repository pattern.
The base repository contains functions for working with data, indexes in Mongo notation, and in specific classes we implement the functions necessary for the service. Indexes are created when the application starts in the background (background flag), for which the StartupEventMixin mixin implementation is used. Dataset queries support pagination and sorting.
A concrete class is created for each separate collection. The project has one repository for client requests. The data storage model is located in the models directory and is called ClientDataModel
. The client model was created with typing supported by MongoDB (datetime instead of date), the created_at attribute is set to generate a default value via default_factory. Also added to the model is a function for calculating the processing time of a request. elapsed_time
and a class method for creating an object from a client request.
hidden text
class ClientDataModel(BaseModel):
created_at: datetime = Field(default_factory=datetime.utcnow)
request_id: str
first_name: str
last_name: str
middle_name: str
birth_date: datetime
birth_place: str = Field(default="")
passport_num: str
document_date: datetime
executed_at: Optional[datetime]
inn: Optional[str]
error: Optional[str]
@classmethod
def create_from_request(cls, request: RequestMqSerializer) -> 'ClientDataModel':
return ClientDataModel(
request_id=request.request_id,
first_name=request.first_name,
last_name=request.last_name,
middle_name=request.middle_name,
birth_date=datetime.combine(request.birth_date, datetime.min.time()),
passport_num='{} {}'.format(request.document_serial, request.document_number),
document_date=datetime.combine(request.document_date, datetime.min.time()),
)
@property
def elapsed_time(self) -> float:
end = self.executed_at or datetime.utcnow()
return (end - self.created_at).total_seconds()
Base repository code:
hidden text
class BaseRepository(StartupEventMixin):
def __init__(self, mongodb_connection_manager: MongoConnectionManager, setting: Settings) -> None:
self.mongodb_connection_manager = mongodb_connection_manager
self.db_name = setting.mongo_name
@property
def collection_name(self) -> str:
raise NotImplementedError
@property
def collection_indexes(self) -> Iterable[IndexDef]:
raise NotImplementedError
def startup(self) -> Coroutine:
return self.create_indexes()
async def create_index(self, field_name: str, sort_id: int) -> None:
connection = await self.mongodb_connection_manager.get_connection()
collection = connection[self.db_name][self.collection_name]
await collection.create_index([(field_name, sort_id), ], background=True)
async def create_indexes(self) -> None:
tasks = []
for index_item in self.collection_indexes:
tasks.append(self.create_index(index_item.name, index_item.sort))
asyncio.ensure_future(asyncio.gather(*tasks))
async def get_one_document(self, criteria: dict) -> Optional[dict]:
connection = await self.mongodb_connection_manager.get_connection()
collection = connection[self.db_name][self.collection_name]
return await collection.find_one(criteria)
async def get_list_document(
self,
criteria: dict,
sort_criteria: Optional[list] = None,
limit: Optional[int] = 0,
skip: Optional[int] = 0,
) -> List[dict]:
if not sort_criteria:
sort_criteria = []
connection = await self.mongodb_connection_manager.get_connection()
cursor = connection[self.db_name][self.collection_name].find(
criteria,
limit=limit,
skip=skip,
sort=sort_criteria
)
result = list()
async for data in cursor:
result.append(data)
return result
async def save_document(self, data: dict) -> str:
connection = await self.mongodb_connection_manager.get_connection()
result = await connection[self.db_name][self.collection_name].insert_one(data)
return result.inserted_id
async def update_document(self, criteria: dict, data: dict) -> None:
connection = await self.mongodb_connection_manager.get_connection()
await connection[self.db_name][self.collection_name].update_one(criteria, {'$set': data})
service layer
The service layer does all the necessary processing on the data.
accessing the database to search for a similar request (request_id and passport data);
return the result if the data was found;
make an API request;
save the query result to the database;
return a response.
In the service layer, I tried to abstract from working with the infrastructure. The response is returned to the calling function, which must know where to return the response. In this case, the queue manager “knows” where to reply due to the presence of the reply-to field in the request header. The return value is formatted as a DTO object (RequestDTO
).
Class code InnService
:
hidden text
class InnService:
def __init__(
self,
settings: Settings,
logger: AppLogger,
client: NalogApiClient,
storage: RequestRepository
) -> None:
self.settings = settings
self.logger = logger
self.client = client
self.storage_repository = storage
async def get_client_inn_from_storage(self, client_data: RequestSerializer) -> Optional[RequestModel]:
client_passport = f'{client_data.document_serial} {client_data.document_number}'
client_request = await self.storage_repository.find_request(client_passport, client_data.request_id)
return client_request
def update_status(self, model: RequestModel, inn: str, error: str) -> None:
model.inn = inn
model.error = error
async def get_client_inn(self, client_data: RequestSerializer) -> RequestDTO:
"""Получение клиентского ИНН"""
start_process = datetime.utcnow()
model = RequestModel.create_from_request(client_data)
# Получить данные из БД
existing_data = await self.get_client_inn_from_storage(client_data)
if existing_data:
elapsed_time = (datetime.utcnow() - start_process).total_seconds()
return RequestDTO(
request_id=client_data.request_id,
inn=existing_data.inn,
elapsed_time=elapsed_time,
cashed=True
)
# Сделать фактический запрос в Nalog API
request = NalogApiRequestSerializer.create_from_request(client_data)
error, result = None, ''
try:
result = await self.client.send_request_for_inn(request)
except NalogApiClientException as exception:
self.logger.error('Error request to Nalog api service', details=str(exception))
error = str(exception)
self.update_status(model, result, error)
await self.storage_repository.save_request(model)
return RequestDTO(
request_id=model.request_id,
inn=model.inn,
details=model.error,
elapsed_time=model.elapsed_time
)
The second service in the application is the infrastructure polling service for health-check. Infrastructure managers that need to be monitored should inherit from the mixin EventLiveProbeMixin
and implement the function is_connected
.
Client
The NalogApiClient client is designed to perform a POST request to https://service.nalog.ru/inn.do and parsing the response status. The function of directly formatting the request is wrapped in a retry request repeater decorator when errors occur. Repeater settings in the general settings of the application.
hidden text
class NalogApiClient:
CLIENT_EXCEPTIONS = (
NalogApiClientException,
aiohttp.ClientProxyConnectionError,
aiohttp.ServerTimeoutError,
)
def __init__(self, settings: Settings, logger: AppLogger):
self.nalog_api_service_url = settings.client_nalog_url
self.request_timeout = settings.client_nalog_timeout_sec
self.retries_times = settings.client_nalog_retries
self.retries_wait = settings.client_nalog_wait_sec
self.logger = logger
self.timeout = aiohttp.ClientTimeout(total=self.request_timeout)
@property
def _headers(self):
return {
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "ru-RU,ru",
"Connection": "keep-alive",
"Origin": "https://service.nalog.ru",
"Referer": self.nalog_api_service_url,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"X-Requested-With": "XMLHttpRequest",
}
async def send_request_for_inn(self, nalog_api_request: NalogApiRequestSerializer) -> Optional[str]:
self.logger.debug(f'Request to nalog api service for {nalog_api_request.client_fullname}')
form_data = nalog_api_request.dict(by_alias=True)
@retry(self.CLIENT_EXCEPTIONS, logger=self.logger, attempts=self.retries_times, wait_sec=self.retries_wait)
async def make_request(client_session: aiohttp.ClientSession):
async with client_session.post(url=self.nalog_api_service_url, data=form_data) as response:
if response.status not in [http.HTTPStatus.OK, http.HTTPStatus.NOT_FOUND]:
response_text = await response.text()
raise NalogApiClientException(response_text)
data = await response.json()
code = data.get('code')
captcha_required = data.get('captchaRequired')
if captcha_required:
raise NalogApiClientException(f'Captcha required for request {nalog_api_request.client_fullname}')
if code == 0:
return 'no inn'
elif code == 1:
return data.get('inn')
else:
raise NalogApiClientException(f'Unable to parse response! Details: {response}')
async with aiohttp.ClientSession(timeout=self.timeout, headers=self._headers) as session:
return await make_request(session)
Container
Container is designed to build the necessary dependencies and pass them to the application. Our container is assembled in the ApplicationContainer class. All dependencies are forwarded as @singletons and registered as dependency providers of @provider types provided by the library injector. When writing tests, you need to prepare another container with actual fake or stub objects.
The main interest in working with the container is concentrated in the class ContainerManager
which is used to test the implementation of the mixin EventSubscriberMixin
And EventLiveProbeMixin
. Function get_event_collection
generates lists of callback functions for starting and exiting the application. Actually passing through the lists and calling callback functions is implemented in the following functions: run_startup
And run_shutdown
.
hidden text
class ContainerManager:
def __init__(self, cls_container: Type[Container]) -> None:
self._container = Injector(cls_container())
self._bindings = self._container.binder._bindings
def get_container(self) -> Injector:
return self._container
def get_live_probe_handlers(self) -> List[Type[Callable]]:
result = []
binding_collection = [binding for binding in self._bindings]
for binding in binding_collection:
if issubclass(binding, EventLiveProbeMixin):
binding_obj = self._container.get(binding)
result.append(binding_obj.is_connected)
return result
def get_startup_handlers(self):
handlers = []
binding_collection = [binding for binding in self._bindings]
for binding in binding_collection:
if issubclass(binding, StartupEventMixin):
binding_obj = self._container.get(binding)
handlers.append(binding_obj.startup())
return handlers
def get_shutdown_handlers(self):
handlers = []
binding_collection = [binding for binding in self._bindings]
for binding in binding_collection:
if issubclass(binding, ShutdownEventMixin):
binding_obj = self._container.get(binding)
handlers.append(binding_obj.shutdown())
return handlers
async def run_startup(self) -> None:
exception = None
for handler in self.get_startup_handlers():
if exception:
handler.close()
else:
try:
await handler
except Exception as exc:
exception = exc
if exception is not None:
raise exception
async def run_shutdown(self) -> None:
handlers = []
for handler in self.get_shutdown_handlers():
handlers.append(handler)
await asyncio.gather(*handlers)
Actually the container itself, in which the necessary instances of classes are initialized. When writing tests, a similar container will be created.
hidden text
class ApplicationContainer(Container):
@singleton
@provider
def provide_settings(self) -> Settings:
return Settings()
# ... немного кода пропущено
@singleton
@provider
def provide_mongodb_connection(self, settings: Settings, logger: AppLogger) -> MongoConnectionManager:
return MongoConnectionManager(settings, logger)
@singleton
@provider
def provide_rabbitmq_connection(self, settings: Settings, logger: AppLogger) -> RabbitConnectionManager:
return RabbitConnectionManager(settings, logger)
@singleton
@provider
def provide_nalog_api_client(self, settings: Settings, logger: AppLogger) -> NalogApiClient:
return NalogApiClient(settings, logger)
@singleton
@provider
def provide_request_repository(self, settings: Settings, mongo_connection: MongoConnectionManager) -> RequestRepository:
return RequestRepository(mongo_connection, settings)
Application
The main task of the application is to put everything together and start a common thread of execution. The application assembly code is extremely simple, the container manager performs the initialization of classes. The assembly of the application is carried out in the following steps:
receiving a container, passing it to the container manager;
event_loop initialization;
adding handlers for queues;
launching initializers for the infrastructure layer (implementing startup mixins);
launching the FastAPI web server to return health-check;
enable global error handler.
hidden text
class Application:
def __init__(self, cls_container: Type[Container]) -> None:
self.loop = asyncio.get_event_loop()
self.container_manager = ContainerManager(cls_container)
self.container = self.container_manager.get_container()
self.settings = self.container.get(Settings)
self.logger = self.container.get(AppLogger)
self.live_probe_service = self.container.get(LiveProbeService)
self.queue_manager = self.container.get(QueueManager)
self.app_name = self.settings.app_name
self.http_server = None
def init_application(self):
self.http_server = ServerAPIManager(self.container)
request_handler = self.container.get(RequestHandler)
self.queue_manager.add_handler(request_handler)
live_probe_handlers = self.container_manager.get_live_probe_handlers()
for handler in live_probe_handlers:
self.live_probe_service.add_component(handler)
def run(self) -> None:
self.logger.info(f'Starting application {self.app_name}')
self.init_application()
try:
self.loop.run_until_complete(self.container_manager.run_startup())
tasks = asyncio.gather(
self.http_server.serve(),
self.queue_manager.run_handlers_async(),
)
self.loop.run_until_complete(tasks)
self.loop.run_forever()
except BaseException as exception:
exit(1)
finally:
self.loop.run_until_complete(self.container_manager.run_shutdown())
self.loop.close()
self.logger.info('Application disabled')
The application starts from the main script using a small typer library. The little library has the ability to conveniently handle command line options.
hidden text
import typer
from core.application import Application
from app_container import ApplicationContainer
def main():
try:
application = Application(ApplicationContainer)
application.run()
except BaseException as exc:
typer.echo(f'Error starting application. Details: {str(exc)}')
if __name__ == "__main__":
typer.run(main)
How to run it all?
The project contains a docker-compose file to build. You need to copy the file .env.example
to file .env
.
docker compose build
docker compose up
After executing these commands, a mongodb instance will be launched on port 27017 and rabbitmq on port 5672 with an admin panel on 15672. The RabbitMQ admin panel can be accessed at http://localhost:15672. In the queues section, you need to create a new queue to which the results of the service will be sent and bind it to the default exchange (direct).
To be continued
The article deals with the topic of developing an application in Python using queues, a dependency container, and health-check support. I propose to discuss the architecture in the comments, and then continue to develop the service. In the next iterations, I plan to add a hypothetical non-free client, which we will use after a certain number of requests in free service. And finally write tests.
Materials that may be useful for understanding the material: