Mechanics Async Wait
This post explores the mechanics of async await in the Elixir language. Elixir's concurrency model is an excellent platform for implementing such mechanics. However, do not consider this post as a guide to developing real-life Elixir applications.
The code for this post is posted on GitHub
Introduction
Many of us first encountered async await when we were learning JavaScript. Since JavaScript offers one of the most popular implementations of this technique, it is the JavaScript approach that is usually locked into memory as the base variant of async await. Therefore, we unwittingly link the fundamental principles of asynchronous programming to the specific implementation and specific decisions made in JavaScript.
One of the most common misconceptions surrounding async await is that the paradigm requires a non-blocking, single-threaded runtime. The runtime environment in JavaScript is non-blocking, primarily That's whythat it is single-threaded. However, asynchronous programming models are quite feasible in both single-threaded and multi-threaded execution environments, both in blocking and non-blocking form.
From Sync to Async
The path from synchronous to asynchronous programming is, in essence, a transition from sequential execution to concurrent execution. At each moment of time we will perform not one task, but several.
Let's try to think about synchronous and asynchronous programming in terms of pairs of events that occur during execution. We assume that the following events of interest to us can be traced in the program trace:
- When programming synchronously
o
invoke function
And
return value
.
- When programming asynchronously
o
invoke function
And
return promise
,
o
await promise
And
return value
.
Synchronous execution
Synchronous execution is a one-way move in which a “call a function and return a value” interaction occurs between the caller and the called party. The caller pauses execution until the callee completes its task and returns a value (Figure 1, left).
Asynchronous execution and promises
Asynchronous execution is a round trip, two interactions. The first operation is that we call and return a promise, and the second is that we expect and return a value from the interaction between the caller and the callee.
A promise can be interpreted as either representing the callee or representing a future value. A promise can be either in a suspended state (this means that the called party is still busy with work and has not returned a value), or as fulfilled – in the latter case, the called party has already completed work and returned a value.
If the promise is in limbo during the await stage, then all callers stop executing and wait until the callee finishes and returns (Figure 1, center). If the promise is completed at the await stage, then the caller continues working with the returned value (Fig. 1., right).
Event Loop
The async await runtime is commonly called an “Event Loop”. An event loop is a scheduler that allows an asynchronously running program to register interest in a specific event. For the purposes of this article, we are only interested in completing the promise. When registration has occurred, execution is suspended. When the promise is completed, the event loop resumes execution.
The rest of this article explains how to implement async await and the event loop in the Elixir language. Why Elixir? Firstly, it’s easy to project async await onto Elixir, which makes for an excellent illustrative implementation. More importantly, this mapping of async await to Elixir dispels the common myth that async await is inherently non-blocking. In fact, it is possible to map asynchronous operations to Elixir processes and purposefully block the process while waiting for a promise.
A Brief Introduction to Elixir
Elixir is a dynamically typed functional programming language that runs on the Erlang Virtual Machine (BEAM).
The key abstraction in the Elixir language is called “process”. It is the smallest independently executable unit with a unique identifier. To find out the ID of the process currently running, you need to run self()
. All code runs in the context of this process. Processes in Elixir work concurrently, but each process executes its instructions sequentially.
# Создаём новый процесс – воспользовавшись идентификатором процесса, можно отправлять этому процессу сообщения
pid = spawn(fn ->
# Этот блок кода будет выполняться в отдельном процессе
end)
Processes exchange information and coordinate actions by exchanging messages. Sending a message is a non-blocking operation, which allows the process to continue executing.
# Отправить сообщение процессу с идентификатором pid (неблокирующая операция)
send(pid, {:Hello, "World"})
In contrast, receiving a message is a blocking operation and the process will be suspended until a suitable message arrives.
# Получить сообщение (блокирующая операция)
receive do
{:Hello, name} ->
IO.puts("Hello, #{name}!")
end
Perhaps the most popular abstraction in Elixir is GenServer. GenServer is a process just like any other process in Elixir. GenServer abstracts all the boilerplate code needed to build a stateful server.
defmodule Counter do
use GenServer
# Клиентский API
# Запускает GenServer
def start_link() do
GenServer.start_link(__MODULE__, 0, name: __MODULE__)
end
# Синхронный вызов, при помощи которого мы получаем текущее значение счётчика
def value do
GenServer.call(__MODULE__, :value)
end
# Асинхронный вызов для инкремента счётчика
def increment do
GenServer.cast(__MODULE__, :increment)
end
# Обратные вызовы сервера
# Инициализируем GenServer с исходным значением
def init(value) do
{:ok, value}
end
# Обрабатываем синхронные вызовы
def handle_call(:value, _from, state) do
{:reply, state, state}
end
# Обрабатываем асинхронные сообщения
def handle_cast(:increment, state) do
{:noreply, state + 1}
end
end
Async Wait in Elixir
Using the Async Await module, a developer can
express
competitive computational structure, whereas the event loop
implements
competitive computing structure.
Let's map the asynchronous execution of a function to the Elixir process executing that function. Using the process identifier (pid), it is convenient to refer to both the execution of a given process and the promise announcing such execution.
We aim to do something like this:
# outer относится к pid внешнего процесса Elixir
outer = Async.invoke(fn ->
# inner относится к pid внутреннего процесса Elixir
inner = Async.invoke(fn ->
42
end)
# Воспользуемся pid, и с его помощью будем ожидать внутреннего промиса
v = Async.await(inner)
2 * v
end)
# Воспользуемся pid, и с его помощью будем ожидать внешнего промиса
IO.puts(Async.await(outer))
Library
Let's start with a simple component – a library. Let me remind you that we have only two interactions, in the first we call a function and return a promise, and in the second we expect a promise and return a value. With an Invoke operation, the caller's work is not suspended, but with await it can be suspended if the promise has not yet been resolved.
defmodule Async do
def invoke(func, args \\ []) do
# вызвать функцию, вернуть промис
# ни в коем случае не заблокирует вызывателя
GenServer.call(EventLoop, {:invoke, func, args})
end
def await(p) do
# ожидать промис, вернуть значение
# может заблокировать вызывателя
GenServer.call(EventLoop, {:await, p})
end
end
In Elixir terms:
GenServer.call(EventLoop, {:invoke, func, args})
is a blocking call. But as we can see, this method always returns immediately, so it can in no way pause the caller.GenServer.call(EventLoop, {:await, p})
is a blocking call. As we will see below, the function does not return immediately in all cases; therefore, the call may suspend the caller.
Event Loop
Let's move on to a more complex component – the event loop.
State
The event loop tracks two types of entities: promises and waiters.
%State{
promises: %{#PID<0.269.0> => :pending, #PID<0.270.0> => {:completed, 42}},
awaiters: %{
#PID<0.269.0> => [
# При помощи этой структуры данных можно отложить отклик на запрос
# см. GenServer.reply
{#PID<0.152.0>,
[:alias | #Reference<0.0.19459.4203495588.2524250117.118052>]}
],
#PID<0.270.0> => []
}
}
Promises
promises
associates the promise identifier with the asynchronous execution status of the function represented in the promise. A promise can be in one of two states:
:pending
means execution is still in progress, or{:completed, result}
indicates that execution has completed receivingresult
.
Expectations
awaiters
associates a promise identifier with a list of execution identifiers waiting for the promise to resolve. Each pid in the list corresponds to a process that has awaited a promise and is currently suspended, waiting for the promise to be fulfilled. When the promise is resolved, the waiting processes are notified about this and their execution can continue, but at the same time taking into account the result of the promise.
Behavior
When tracking the current state of each asynchronously performed operation through promises, as well as dependencies between execution operations through waiters, the event loop comes to our aid. It helps orchestrate concurrent code execution. To do this we need only three methods:
defmodule EventLoop do
use GenServer
alias State
def start_link(_opts \\ []) do
GenServer.start_link(__MODULE__, State.new(), name: __MODULE__)
end
def init(state) do
{:ok, state}
end
def handle_call({:invoke, func, args}, {caller, _} = _from, state) do
# ...
end
def handle_call({:await, promise}, {caller, _} = from, state) do
# ...
end
def handle_call({:return, callee, result}, {caller, _} = _from, state) do
# ...
end
end
Call
The invoke method spawns a new Elixir process and uses the process ID assigned to it. callee
as a promise identifier. A process performs a function using apply(func, args)
and then calls the return method related to the event loop. Using this method, the result of the function is returned, with the help of which the promise is fulfilled.
def handle_call({:invoke, func, args}, {caller, _} = _from, state) do
# Здесь мы используем id процесса, одновременно являющийся id промиса
callee =
spawn(fn ->
GenServer.call(EventLoop, {:return, self(), apply(func, args)})
end)
new_state =
state
|> State.add_promise(callee)
{:reply, callee, new_state}
end
Expectation
This is the whole point of the event cycle. When calling await, you need to distinguish between two cases:
- If the promise is resolved, then we immediately respond to the caller (without pausing its work) and return the result.
- If the promise is suspended, there is no immediate response to the caller (the caller is suspended), and the caller is registered as the unit waiting for the promise.
def handle_call({:await, promise}, {caller, _} = from, state) do
# Центральный if-оператор
case State.get_promise(state, promise) do
# Промис приостановлен, отклик откладывается до его завершения
:pending ->
new_state =
state
|> State.add_awaiter(promise, from)
{:noreply, new_state}
# Промис завершён, сразу же отвечаем
{:completed, result} ->
{:reply, result, state}
end
end
Return
When the process exits, we iterate through the list of waiters and respond to the request (resuming the caller), and then return the result. In addition, we update the state of the promise from pending to completed.
def handle_call({:return, callee, result}, {caller, _} = _from, state) do
Enum.each(State.get_awaiter(state, callee), fn {cid, _} = caller ->
GenServer.reply(caller, result)
end)
new_state =
state
|> State.set_promise(callee, result)
{:reply, nil, new_state}
end
Launching the application
Now everything is ready to run the application. In case you are unable to immediately reproduce my results, I will leave you the code in the form of Elixir Livebook, it is located at
.
IO.inspect(self())
outer = Async.invoke(fn ->
IO.inspect(self())
inner = Async.invoke(fn ->
IO.inspect(self())
42
end)
v = Async.await(inner)
2 * v
end)
IO.puts(Async.await(outer))
As a result of running the application, we get approximately the following output:
#PID<0.152.0>
#PID<0.269.0>
#PID<0.270.0>
84
Also, look at the entity diagram and sequence diagram, which illustrate the entire structure and behavior of executing functions and promises.
Entity Diagram
Sequence diagram
Review
In this article we explored the basic mechanics of async await, but this topic does not end there. For example, there are mechanisms for combining promises, such as Promise.all (wait until all promises from a list are fulfilled) or Promise.one (wait until at least one promise from a list is fulfilled). Another interesting topic is binding promises, when a function returns not a value, but a promise. Study these topics on your own.
Conclusion
Async await is a programming model that makes concurrency a paramount consideration. Using async await, it is convenient for the developer to express the concurrent structure of calculations, while the event loop will perform these calculations.