Making our own awaitables and event loops

To do this, you can create your own awaitables and custom event loops.

Awaitables

Awaitable objects represent entities that can be used in await expressions for waiting for asynchronous operations to complete. There are three main types of awaitable objects in asyncio: coroutines, futures And tasks.

  1. Coroutines are functions that can be suspended and resumed during execution, allowing other coroutines to run in parallel. A keyword is used to define a coroutine async defand to call – the keyword await.

  2. Futures represent an abstraction of asynchronous computation that can be completed in the future. They provide an interface for waiting for asynchronous operations to complete and receiving their result.

  3. Taski are a special type of futures, which are executable coroutines. They are created using the function asyncio.create_task() to run a coroutine in a separate thread of execution.

Let's move on to creating our own awaitable's

Let's say there is an external service that you want to use in an asynchronous application. To do this, you can create a custom awaitable object that waits for an event from this service. Let's assume that there is also a service that generates an event when new data is available for reading. You can create your own awaitable object that listens for this event:

import asyncio

class CustomAwaitable:
    def __init__(self):
        self._event = asyncio.Event()

    async def wait_for_event(self):
        await self._event.wait()

    def set_event(self):
        self._event.set()

async def main():
    custom_awaitable = CustomAwaitable()
    # Ждем события
    await custom_awaitable.wait_for_event()
    print("Событие произошло!")

asyncio.run(main())

Sometimes you need to wait for not one event, but several. To do this, you can create a custom awaitable object that waits for several events and returns once they have all occurred:

import asyncio

class MultiEventAwaitable:
    def __init__(self, num_events):
        self._num_events = num_events
        self._events = [asyncio.Event() for _ in range(num_events)]

    async def wait_for_events(self):
        await asyncio.gather(*self._events)

    def set_event(self, index):
        self._events[index].set()

async def main():
    multi_event_awaitable = MultiEventAwaitable(3)
    # устанавливаем события
    multi_event_awaitable.set_event(0)
    multi_event_awaitable.set_event(1)
    multi_event_awaitable.set_event(2)
    # Ждем всех событий
    await multi_event_awaitable.wait_for_events()
    print("Все события произошли!")

asyncio.run(main())

Let's say there is a need to interact with an external API that provides data to an asynchronous application. You can create a custom awaitable object that waits for a response from this API and returns data for further processing. Let's implement a simple HTTP client that will use aiohttp library for sending requests to the API and waiting for a response:

import aiohttp

class ExternalAPIAwaitable:
    def __init__(self, url):
        self.url = url

    async def fetch_data(self):
        async with aiohttp.ClientSession() as session:
            async with session.get(self.url) as response:
                return await response.json()

async def main():
    api_awaitable = ExternalAPIAwaitable("https://api.example.com/data")
    data = await api_awaitable.fetch_data()
    print("Получены данные от внешнего API:", data)
    
asyncio.run(main())

ExternalAPIAwaitable represents a custom awaitable object that awaits a response from an external API.

Event Loops

Event Loop represents an infinite loop that waits for asynchronous events to occur and calls the appropriate callbacks to process them. It keeps track of various tasks waiting for I/O operations to complete and switches execution context between them as needed.

The cycle mainly consists of three main components:

  1. Tasks represent asynchronous operations that must be performed in the event loop. Each task is a coroutine that can be run in an event loop.

  2. The event loop contains task queue, which stores all pending tasks. When the event loop starts, it fetches tasks from this queue and executes them.

  3. Each event type has its own handler, which is called when an event occurs. For example, I/O events have handlers that fire when I/O operations complete.

Let's look at examples of creating such custom event loops.

Creating a custom event loop using asyncio.AbstractEventLoop

import asyncio

class CustomEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._running = False

    def run_forever(self):
        self._running = True
        while self._running:
            # обработка событий, выполнение задач и т.д.
            pass

    def stop(self):
        self._running = False

# экземпляр кастомного цикла событий
loop = CustomEventLoop()

# добавление задач в кастомный цикл событий
async def task():
    print("Задача выполняется...")
    await asyncio.sleep(1)
    print("Задача выполнена!")

loop.call_soon(loop.create_task, task())

# запуск
loop.run_forever()

Created a custom event loop, inheriting from asyncio.AbstractEventLoop. Method run_forever() and method stop() control the start and stop of the cycle. Also added an asynchronous task to the event loop using loop.create_task() and started the event loop with loop.run_forever().

asyncio.get_event_loop_policy() to customize the event loop policy

import asyncio

class CustomEventLoopPolicy(asyncio.events.BaseDefaultEventLoopPolicy):
    def _loop_factory(self):
        return CustomEventLoop()

# установка кастомной политики цикла событий
asyncio.set_event_loop_policy(CustomEventLoopPolicy())

# получение цикла событий
loop = asyncio.get_event_loop()

# добавление задачи в кастомный цикл событий
async def task():
    print("Задача выполняется...")
    await asyncio.sleep(1)
    print("Задача выполнена!")

loop.run_until_complete(task())

Created a custom class CustomEventLoopPolicywhich is inherited from asyncio.events.BaseDefaultEventLoopPolicy to manage the creation of a custom event loop. Then set this custom policy using asyncio.set_event_loop_policy() and got the event loop instance with asyncio.get_event_loop(). The task was added to the event loop and executed using loop.run_until_complete().

Extending the Standard Event Loop with Subclassing

import asyncio

class CustomEventLoop(asyncio.get_event_loop().__class__):
    async def run_custom_task(self):
        print("Запуск пользовательской задачи...")
        await asyncio.sleep(1)
        print("Пользовательская задача выполнена!")

# экземпляр кастомного цикла событий
loop = CustomEventLoop()

# запуск пользовательской задачи
async def main():
    await loop.run_custom_task()

# запуск основной асинхронной функции
loop.run_until_complete(main())

Created a custom event loop by extending the standard event loop asyncio.get_event_loop().__class__. Then we added a custom task to this event loop and ran it with loop.run_until_complete().


In conclusion, I would like to recommend you the free webinars of the Python Developer course. Professional:

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *