std::execution is accepted in C++26 — time to get acquainted

A unified framework for asynchrony and parallelism, the ability to share computing resources across different code bases without complex integration, ease of use and flexibility – all of this is what C++ never happened.

At the end of June, the standard committee approved the inclusion std::execution (P2300) V C++26. This is a proposal that is intended to solve the above problems. Let's figure it out!

Let's start from afar

Imagine the situation – you are developing a service that shifts jsons does some useful calculations. For example, it could be image compression – a relatively heavy operation in terms of CPU consumption. In the core of your service there is a thread pool, where all calculations are performed. At some point, you were exposed to AI hype and decided to add object classification in the picture to the same service. To do this, you connected a third-party inference library (running new data through a trained model) to the project. Having started testing the new version of the service, you noticed a serious drop in performance, even with the adjustment that more work is now being done. Having figured it out, it turned out that the new library has its own thread pool, and now when the service is running, you have a constant fight for a limited number of CPU cores, context switches and cache thrashing.

What are the options for solving the problem?

(For simplicity of the example, we assume that each separate compression and inference job is performed in 1 thread)

  1. Reduce the size of both thread pools so that there is no competition for cores – solves the problem of competition for resources, but the cores will be idle part of the time, for example, when there are tasks for inference, but no tasks for compression. Also, this option implies that the author of the inference library has thought out the interface for accessing the internal thread pool and its configuration, which is not always the case.

  2. Integrate the distribution of our service's work with the library in such a way that a common thread pool is used – then we will get optimal utilization of resources. However, most likely, neither our service nor the library is ready for such integration, because there is no single standard framework (or at least a set of patterns) that everything follows. In practice, such integration will require, at a minimum, expensive refactoring of the service, and quite possibly a fork with refactoring of a third-party library. Do not forget that your projects can use more than 1 library, and with each of them the integration will be different. All this is very expensive and time-consuming. And if you start thinking about things like early cancellation of execution, it becomes even worse.

Unless the author of the article has forgotten (or deliberately concealed!) some obvious better alternative, then both options are unpleasant.

So what to do?

std::execution offers a standard model of asynchrony, which covers the issues of compatibility of asynchronous interfaces, division of computing resources, and also provides a set of basic algorithms that allow the construction of complex asynchronous computing graphs.

In our example, if the author of a third-party library provides an interface that is compatible with std::executionthen the integration of computing resources will be practically free.

Basic example of use

Try it on Godbolte

(The code is taken from the proposal, but slightly tweaked to work on the library stdexec – this is one of the implementations of the proposal; everything except thread_pool is the same as in the standard)

scheduler auto sch = thread_pool.get_scheduler();

sender auto begin = schedule(sch);
sender auto hi = then(begin, []{ 
    std::cout << "Hello world! Have an int.";
    return 13;
});
sender auto add_42 = then(hi, [](int arg) { return arg + 42; });

auto [i] = sync_wait(add_42).value();

What happens here is this:

  1. Requested scheduler (scheduler) associated with a specific computing resource – in this case, the resource is a regular thread pool. The thread pool is not part of the standard, unlike the scheduler interface. Thread pool authors will have to write their own schedulers.

  2. schedule(scheduler) – this function will return a sender (hence the informal name of the proposal – Senders / Receivers) – a unit of asynchronous work, from which you can start building a computational graph executed on the resource associated with the passed scheduler. More about sender will be below.

  3. then – takes a sender and a functor. When the passed sender completes execution, the passed functor will be called with its result.

  4. thenand other sender factories, can be combined with each other in any order, the main thing is that the types of input and output data match. In our example, when 'hi', its result – 13 – will be passed on to add_42which in turn will return (13 + 42).

  5. Probably the most important thing to understand in this example is that no work is performed until the graph is explicitly started. Challenges schedule And then only define the structure of the graph. In order for the job to start being submitted to the scheduler, it must be initiated.

  6. sync_wait – launching an asynchronous graph, synchronously waiting for its execution (yes, it doesn’t make sense in this form, but it’s convenient for an example) and reading the result.

Key abstractions

Computing resource – something that can run calculations. For example, a thread pool, a GPU, or just a current thread.

Scheduler – essentially a standardized interface for accessing any computing resource. The scheduler can also contain some strategy for distributing work. For example, the strategy can be the presence of an affinity thread pushing work with some thread inside the threadpool.

Sender – an object that describes some computations. I think the choice of name is not obvious to most people (including me). The idea is that sender “sends” values ​​obtained as a result of a computation. Senders can be composed and used to build computational graphs. A sender can be single-use or reusable, it can be forked (connect more than one sender) and even canceled!

Receiver – a callback that supports 3 call options: successful with the operation result, unsuccessful with error information, or early completion, which can happen when the operation is canceled. The developer does not have to interact with receivers – this is for library authors.

Cancel execution

Back to the example. Our service and library were rewritten to std::execution with achieving good utilization of computing resources. However, during operation, we noticed that sometimes saving a compressed image to a remote object storage hangs. In order to prevent network problems from stopping the rest of the work, it was decided to add cancellation of the request wait by timeout.

//  Старый вариант
sender auto result = 
    just(image)
    | store_to_s3();

// С таймаутом
sender auto result = 
    just(image)
    | timeout(
        store_to_s3(),
        5s);

If store_to_s3 does not send the result within 5 seconds, then its execution will be canceled. timeout is not part of the standard. The point is not in the specific implementation, but in the fact that the asynchronous work stop interface is now standardized. There will be no need to pass global variables like “bool stop_running” or callbacks anymore – all the necessary mechanisms have already been thought out, the only thing that needs to be done is to correctly process the cancellation request in a specific algorithm (sender).

just – is a sender that sends the value passed to it in the constructor.

Interacting with coroutines

All (almost) awaitables are senders – in other words, we can use coroutines in algorithms that work with senders:

task<void> some_coro(int);

// when_all - сендер, конкурентно запускающий N переданных сендеров, 
// и завершающий выполнение, когда завершится выполнение последнего 
// переданного сендера.
sync_wait(
    when_all(
        some_coro(1),
        some_coro(2)
    )
);

The opposite – calling a sender from a coroutine – also works, but not with all senders. The problem is that the sender interface is fundamentally more expressive than that of a coroutine. The standard will execution::with_awaitable_senderswhich makes it relatively easy to build into your coroutine class (task) support for senders that send (return in the context of coroutines) only 1 value. If you do this, you can do co_await your sender.

(Example taken from the proposition)

// SingleValueSender - нестандартный концепт, 
// проверяющий что сендер отправляет только 1 значение 
template<class S>
  requires SingleValueSender<S&> 
task<SenderReturnType<S>::type> retry(S sender) {
    for (;;) {
        try {
            co_return co_await sender;
        } catch(...) {
        }
    }
}

For the rest of the senders, you will either have to write the binding yourself or use execution::into_variant.

template<class S>
task<void> retry(S sender) {
    for (;;) {
        try {
            // если sender отправляет (int, float), то типом some_variant
            // будет variant<tuple<int, float>>
            auto some_variant = co_await execution::into_variant(sender);
          
            do_something(some_variant);
            co_return;
        } catch(...) {
        }
    }
}

Big example

Shows what an asynchronous pipeline for processing some http request might look like.

(Taken from stdexec)

struct http_request;
struct http_response;
struct classification_result;
struct image;

enum class obj_type {
    human,
    dog,
    cat,
    /* другие варианты объектов... */
    general_error, 
    cancelled,    
};

// Получение картинки из тела запроса
image extract_image(http_request req);
// Классификация объекта на картинке
obj_type run_classifier(image);
// Генерация картинки, закодированной в base64
string generate_body_with_image();

// Запуск классификации
classification_result do_classify(image img) {
    obj_type result = run_classifier(img);
    return {result, 0};
}

// Обработчик возникших ошибок; переводит исключение к общему типу с do_classify
classification_result on_classification_error(std::exception_ptr) {
    return {obj_type::general_error, 100, {}};
}

// Обработчик отмены; производит тот же тип, что и do_classify
classification_result on_classification_cancelled() {
    return {obj_type::cancelled, 100};
}

// Общий обработчик, вызываемый для всех 3 вариантов завершения do_classify
// Переводит результат работы к ответу на запрос
http_response to_response(classification_result res) {
    switch(res.type_) {
        case(obj_type::general_error):
            return {500, res.details_};
        case(obj_type::cancelled):
            return {503, "cancelled"};
        default:
            return {200, to_string(res.type_)};
    }
}

// Обработчик http_request
stdexec::sender auto handle_classify_request(const http_request& req) {
    return
        // just "отправляет" http_request
        stdexec::just(req)
        // Чтение картинки из http_request
        | stdexec::then(extract_image)
        // Классификация картинки
        | stdexec::then(do_classify)
        // Обработчик ошибки
        | stdexec::upon_error(on_classification_error)
        // Обработчик отмены
        | stdexec::upon_stopped(on_classification_cancelled)
        // Перевод результата к http_response
        | stdexec::then(to_response);
}

int main() {
    exec::async_scope scope;
    exec::static_thread_pool pool{4};
    stdexec::scheduler auto sched = pool.get_scheduler();

    // Генерируем фейковые запросы
    for (int i = 0; i < 12; i++) {
        http_request req{"/classify", {}, generate_body_with_image()};

        // Сендер обрабатывающий http_request и отправляющий http_response
        stdexec::sender auto snd = handle_classify_request(req);

        // "Ответ" на запрос
        stdexec::sender auto action = 
        std::move(snd) //
            | stdexec::then([](http_response resp) {
                std::cout << std::format(
                    "Sending response: {} / {}\n", 
                    resp.status_code_, 
                    resp.body_);
            });
        scope.spawn(stdexec::on(sched, std::move(action)));
    }

    stdexec::sync_wait(scope.on_empty());
    pool.request_stop();
}

Please note the usage async_scope – this is still non-standard (but they say that it was missed P3149 should also get in C++26) extension. The essence of the component is that it allows you to translate a dynamic (unknown at the time of compilation) number of units of work (senders) into 1 sender, and correctly complete the work. Conceptually similar to when_allBut when_all works at compile time.

What about implementations?

There are two main ones:

Conclusion

The article only considered the part of the innovations that will be seen by users of libraries written in std::executionThere's a lot more interesting stuff under the hood, and you'll have to deal with it if you want to write your own sender or scheduler.

In my opinion, the main disadvantage std::execution is an exorbitant complexity. If the implementation of coroutines in C++20 seemed complicated to me, then std::executionas if he set a goal to surpass it. A huge number of new abstractions are introduced at once, the interaction of which is not obvious, especially if you have not been following this area for the last 10 years. The entry threshold is very high even by the standards of the pluses.

But on the other hand, the proposed std::execution the model has already proven itself in sales of top companies (at least Meta & NVIDIA), as productive and quite universal. It will be very interesting to see the level of use in the community in 10 years.

Interested?

To stay up to date with the latest news and interesting developments in the world C++subscribe to my telegram channel.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *