Async iterator timeout in Python

Introduction

Let's imagine the following problem: we have a microservice architecture in which services communicate through a message broker, or through gRPC. One way or another, both options provide a full-duplex communication channel through which one service can send many messages to another service, and in the opposite direction – the service executing the request can send several responses (for example, in the case of streaming data processing). This option for implementing the answer can, in a sense, be called streaming.

Among other tasks solved when implementing the streaming feature, there is the task of determining a situation in which the service executing the request has crashed with an error and can no longer continue streaming responses. In this case, we cannot even understand what exactly happened – the next portion of the response will be processed and sent, but it will be delayed, or the transmission has been interrupted, and we need to report the error “upstream”. In the HTTP protocol, for example, a header can be used to determine the correct reading of the response Content-Length. It is enough to count the number of request/response body bytes read from the socket and compare with the header value. It matches – we got everything, if it doesn’t match and the socket is closed – an error. However, the solution with a predetermined amount of data in the first portion of responses is not universal, since in all cases it is not possible to understand exactly how much data will be transmitted. And the architecture using message brokers involves constantly maintaining a connection, so we can only know that responses to a previously made request are coming from such and such a queue, and some of the responses will contain an end mark as a marker that the request has been processed and the answer has been sent and received in full, and if such a token has not yet been received, you just have to continue to wait. But you can wait forever.

In the case when you need to determine that the response is not received at all, or only part of it has arrived, but the rest still does not “arrive”, you can use timeouts, and this seems to be the only fairly simple and sane solution that does not require significant changes to the code infrastructure .

Lazy evaluation infrastructure in Python

Python for any type of multitasking implementation (processes, threads, asyncio) provides developers with a set of objects and methods with support for timeouts. For example module concurrent.futures provides a Future class that is the result of deferred evaluation. Method result() Future supports a timeout argument, which, when set to a value other than Nonewill give TimeoutError, if the result was not established within the allotted time. The same goes for the Queue class, designed for organizing a queue. But, like Queueso Future from concurrent only applies to inter-thread and inter-process communication. If we talk about asynchronous interaction, we will see that similar objects no longer support timeouts out of the box. But asyncio provides a method wait_forin which you can specify a waiting timeout, after which a message will be issued asyncio.TimeoutError.

But wait_for can only be hung on a coroutine or futurebut on async for You can’t just set a timeout. This situation will be especially relevant when using, for example, the module aio_pika, which implements asynchronous interaction with RabbitMQ. The documentation suggests using async for to organize a consumer.

async with queue.iterator() as queue_iter:
  async for message in queue_iter:

Any iterator provides a method __next__()and asynchronous according to coroutine __anext__()which you can pull to get the next value from the iterator, but then the code will turn into something indigestible.

async with queue.iterator() as queue_iter:
    try:
      message = await asyncio.wait_for(queue_iter.__ anext __(), timeout=1.0)
    except TimeoutError:
      print('timeout!')

Somehow it's not very nice. I would like to have a more universal wrapper that supports wait_for For __anext__and can correctly determine the situation in which the last portion was issued by the iterator (for example, in turn, as in the example from the documentation aio_pika).

To build an asynchronous iterator with timeout support, you need to do the following steps:

  1. Create a class that implements the method __aiter__

  2. __aiter__ will return an object supporting the call __anext__ to get next value from iterator

  3. Since our iterator will serve as a wrapper, the method __anext__ will delegate the call to the nested iterator, imposing wait_for for every call. This way we will apply the timeout to each iteration rather than to the entire loop.

  4. Implement a termination condition for the wrapper iterator. In Python, iterators throw a special type of exception to notify the for or async for construct that the iteration has completed, the sequence has been traversed. Since the implementation of a wrapper iterator with timeout support must understand in what case to terminate normally, we will need to add logic for checking and throwing an exception to the implementation.

Final code:

class AsyncTimedIterable:
    def __init__(self, iterable, timeout=0):

        class AsyncTimedIterator:
            def __init__(self):
                # Сохраним внутри итератор, вызовы которого требуется делать с использованием таймаутов
                self._iterator = iterable.__aiter__()

            async def __anext__(self):
                try:
                    # Здесь мы и будем вызвать __anext__ обернутого итератора, устанавливая таймаут в wait_for
                    result = await asyncio.wait_for(self._iterator.__anext__(), int(timeout))
					# Если в каком-то из объектов, полученных на очередной итерации, есть признак того, что этот ответ последний - проверяем наличие этого признака и выбрасываем StopAsyncIteration чтобы выйти из async for
					if not result:
						raise StopAsyncIteration

					return result

                except asyncio.TimeoutError as e:
                    # Если wait_for выдаст исключение - тут можно его обработать, либо передать наверх, также прекратив работу async for
                    raise e

        self._factory = AsyncTimedIterator

    def __aiter__(self):
        return self._factory()

Now we can wrap queue_iter (the queued message iterator in aio_pika) and specify a timeout:

timed_iter = AsyncTimedIterable(queue_iter, 30)

And then to receive messages, use async for with our wrapper:

async for r in timed_iter:
	pass

If the time interval between messages exceeds the specified threshold, an asyncio.TimeoutError exception will be thrown; if the work completes correctly, a StopAsyncIteration exception will terminate the async for loop.

My colleagues and I talk about practical development tools in online courses. Look at the catalog and choose the appropriate direction.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *