Asynchronous ETL process in Python

I continue the series of articles on the development of ETL processes in Python. This time, we’re converting the synchronous etl process from Writing an ETL Process in Python to an asynchronous one.

Let me remind you that we are interested in the ETL process (extract, transform, load) implemented through the “Chain of Responsibilities” pattern. As an example, we will develop three handlers that will pass data sequentially from one handler to another. Each subsequent handler decides whether it can process the request itself and whether it is worth passing the request further along the chain.

Using Queues

In the traditional “synchronous” solution, we passed data sequentially from function to function. For an asynchronous project, this solution will not work. Let’s use queues as intermediate storage between the stages of our etl process.

A specific function is responsible for each stage. Thus, each subsequent function is abstracted from the code of the previous function and “looks” only into the queue.

Implementing an asynchronous project preparation

For technical implementation, we will use the package AnyIO. The ability of the library to work with streams of memory objects, which are designed to implement the producer-consumer pattern with several tasks, played in favor of this decision. Using create_memory_object_stream() we will get a pair of streams of objects, one to send and one to receive. Basically, they work like queues, but with support for closure and asynchronous iteration.

Below is a code example of three extract, transform and load functions and two intermediate queues:

from anyio import create_memory_object_stream
from anyio.streams.memory import MemoryObjectSendStream, MemoryObjectReceiveStream

send_transform_stream, receive_transform_stream = create_memory_object_stream()
send_load_stream, receive_load_stream = create_memory_object_stream()

async def extract(send_stream: MemoryObjectSendStream) -> None:
    """ Извлекает из БД данные и передает их в очередь
        :param send_stream: очередь, в которую будут записаны результаты выполнения текущей функции
    """

    async with engine.connect() as conn:
        statement = select(source.c.id, source.c.number)

        cursor = await conn.execute(statement)
        async with send_stream:

            record = cursor.fetchone()  # можно использовать fetchmany, чтобы извлекать данные "пачками"
            while record:
                await send_stream.send(record._mapping)
                record = cursor.fetchone()


async def transform(receive_stream: MemoryObjectReceiveStream, send_stream: MemoryObjectReceiveStream) -> None:
    """ Пример промежуточной команды
        :param receive_stream: очередь, из которой извлекаются данные для выполнения текущей функции.
        :param send_stream: очередь, в которую будут записаны результаты выполнения текущей функции.
    """

    async with receive_stream, send_stream:
        async for record in receive_stream:
            pass
            await send_stream.send(record)


async def load(receive_stream: MemoryObjectReceiveStream) -> None:
    """ Завершающий этап цепочки команд
        :param receive_stream: очередь, из которой извлекаются данные для выполнения текущей функции
    """

    async with receive_stream:
        async for record in receive_stream:
            pass

The next task is to run our asynchronous functions. Task processing in AnyIO broadly corresponds to the trio model. Tasks can be created (spawned) using task groups. A task group is an asynchronous context manager that ensures that all of its child tasks are completed in one way or another after exiting the context block. If a child task or code in a nested context block throws an exception, all child tasks are cancelled. Otherwise, the context manager simply waits for all child tasks to complete before continuing.

Let’s take an example from the documentation as a basis:

from anyio import create_task_group, run, create_memory_object_stream
from anyio.streams.memory import MemoryObjectSendStream, MemoryObjectReceiveStream

async def main():

    send_transform_stream, receive_transform_stream = create_memory_object_stream()
    send_load_stream, receive_load_stream = create_memory_object_stream()

    async with create_task_group() as tg:
        tg.start_soon(etl.extract, send_transform_stream)
        tg.start_soon(etl.transform, receive_transform_stream, send_load_stream)
        tg.start_soon(etl.load, receive_load_stream)


if __name__ == '__main__':
    run(main)

Filling the code blank with logic

In this section, I will use the same subsection names as in the previous article so that you can compare if you wish.

Formulation of the problem

A fictitious task, the main thing is to “drive” data from function to function.

The database has a table containing integers. The ETL process must go through all the records in the table, square each number, and display it in the console. For every even number, display the informational message “the square of an even number”. If the number from the database is 3, then we interrupt the processing and move on to the next number.

First function

The task of the first handler from the “chain of responsibilities” in our fictitious example is to make an sql query to a table containing numeric strings and pass them one by one to the generator:

async def extract(send_stream: MemoryObjectSendStream) -> None:
    """ Извлекает из БД строки и передает их в очередь
    Args:
        :param send_stream: очередь, в которую будут записаны результаты выполнения текущей функции
    """

    async with engine.connect() as conn:
        statement = select(source.c.id, source.c.number)

        cursor = await conn.execute(statement)
        async with send_stream:

            while record := cursor.fetchone():  # можно использовать fetchmany, чтобы извлекать данные "пачками"
                await logger.debug(f'send {record._mapping}')
                await send_stream.send(record._mapping)

Second and all intermediate functions

All intermediate functions of the “chain of responsibilities” have three tasks – to receive an object from the queue, process it and pass it to the next queue. Continuing our fictional example:

async def transform(receive_stream: MemoryObjectReceiveStream, send_stream: MemoryObjectReceiveStream) -> None:
    """ Пример промежуточной команды
        Args:
            :param receive_stream: очередь, из которой извлекаются данные для выполнения текущей функции.
            :param send_stream: очередь, в которую будут записаны результаты выполнения текущей функции.
    """

    foo: int | str  # инструкция для mypy

    async with receive_stream, send_stream:
        async for record in receive_stream:
            await logger.debug(f'transform function received {record}')

            # Логика обработки
            new_number = record["number"] ** 2
            if record["number"] % 2 == 0:
                foo = "an even number"
                await logger.info("{} {}".format(record["number"], foo))
            elif record["number"] == 3:
                await logger.error("{} {}".format(record["number"], "skip load stage"))
                continue  # Прерываем цепочку команд
            else:
                foo = 0

            await send_stream.send((new_number, foo))

The if/elif/else branches show that you can control the datasets that will be sent to the next stage. And also through continue, you can generally interrupt the execution of the chain of duties.

final function

Technically, this is the function from the previous section, only without the instruction to send objects to the next queue. We complete our fictional chain:

    async def load(receive_stream: MemoryObjectReceiveStream) -> None:
    """ Завершающий этап цепочки команд
        Args:
            :param receive_stream: очередь, из которой извлекаются данные для выполнения текущей функции
    """

    async with receive_stream:
        async for record in receive_stream:
            await logger.debug(f'load function received {record}')

            match record:
                case (int(number), str(bar)):
                    await logger.info('the square of {} = {}'.format(bar, number))
                case (int(number), int(bar)):
                    await logger.info(number)
                case _:
                    raise SyntaxError(f"Unknown structure of {record=}")

Here, as a business logic, the data set is managed through match/case.

Drawing conclusions

Asynchronous python is “complicated” python, but etl processes on it look more readable (see code example in section Implementing an asynchronous project preparation) than processes built on generators. This is a significant plus when building long chains of tasks.

You gain an advantage by being proficient in writing asynchronous code, both in terms of code review and performance.

The repository with the training project is available at the link https://github.com/s-klimov/etl-template/tree/04-async

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *