What is a Thread Pool and how to write it yourself in C ++

Who is the article for?

An article for those who want to understand how the Thread Pool works and write a naive implementation using С++ 14 And С++ 17. It is worth mentioning that the presented implementations will represent a solution to an educational problem and are not suitable for commercial use.

What can I learn from the article?

Who is the Thread Pool?

It’s convenient to use паттерн, which allows you to perform multiple tasks using the resources of multiple threads. The Thread Pool usually consists of task queues and several threads that take tasks from the queue and execute them in parallel. There are also implementations with a separate queue for each thread, but we will not consider them.

Why use Thread Pool?

  • Helps increase program performance by creating threads once (creating a thread is considered a fairly heavy operation).

  • Provides a convenient interface for working with multithreading.

Some numbers

Having tested 3 cases: launch without additional threads, with creation of threads via std::thread and using thread_pool. On the program:

void test_func(int& res, const std::vector<int>& arr) {
    res = 0;
    for (int i = arr.size() - 1; i >= 0; --i) {
        for (int j = 0; j < arr.size(); ++j) {
            res += arr[i] + arr[j];
        }
    }
}

We got the following results:

Launch method

Time (milliseconds)

Number of threads

No additional streams

83954

one

std::thred

62386

6

thread_pool

52474

6

I can create a maximum of 8 threads on my machine and ran the test case in Visual Studio on the Windows platform. This means that the background work of third-party applications can create fluctuations and we will get different times every time we start. Example code can be seen HERE function run_test .

Why didn’t 6 threads speed up the code by 6 times?

The fact is that the Thread Pool in my implementation is not made optimally. uses condition_variable, which greatly slows down the work, there are also synchronization tools (you need to remember about Amdahl’s law). In addition to the above facts, testing was carried out on Windows, which does not exclude the sharing of shared PC resources with other applications.

Okay, but then how does Thread Pool work?

Thread Pool has task queuefrom which each thread gets a new task, provided that the queue is not empty and the thread is free. For a more detailed description, let’s look at the work of the Thread Pool using an example:

Initial stage: all threads are free, and there are 5 tasks in the queue.

Stage 1: Each of the threads took the task for execution, while in practice the first thread does not necessarily take the first task, it depends on who gets access to the first shared resource – queues. Only tasks 4 and 5 remained in the queue (tasks that remained in the queue are marked in black).

Stage 2: At 3 seconds, the second thread completes the execution of task 2 and takes the first free task from the queue (task 4). Only 5 tasks remain in the queue.

Stage 3: The third thread completed task 3 and took the last task from the queue (task 5). The queue has become empty, but our program should not exit, first we should wait for all tasks to complete.

Stage 4: The first thread has finished executing its task and is not taking on new tasks (because the queue is empty). It may seem that if we do not have tasks in the queue, then the thread does not perform the following tasks. In fact, this is not the case, and as soon as a new task comes, a free thread (but we don’t know which one) will immediately begin its execution.

Stage 5: The third thread has finished executing task 5.

Stage 6: The second thread has finished executing task 4.

Outcome: Thread Pool completed 5 tasks in 11 seconds.

What will our Thread Pool be able to do?

We have already dealt with the general mechanism of the Thread Pool, now let’s think about its functionality.

Our Thread Pool will have 2 types of tasks: блокирующие (will slow down the main thread until the command is executed) and не блокирующие (guaranteed to be executed very quickly and do not slow down the main thread).

Thread Pool will have the following interface:

  • init(num_threads) – a method that creates an array of num_threads streams. In our implementation, this method will be constructor.

  • add_task(task_func, args)non-blocking method for adding a new task. accepts a function task_func and arguments of this function args and returns task_id (unique task number).

  • wait(task_id)blocking methodA that is waiting for the task with the specified task_id. In this implementation, we will not save function result (we will fix this shortcoming a little later), while the function must return void.

  • wait_all()blocking methodA that waits for all tasks to complete.

  • calculated(task_id)non-blocking methodchecking whether the task with the number has been completed task_id.

  • shotdown()blocking method, which waits for all tasks to finish and terminates the Thread Pool. To terminate the program correctly, we will use destructor (although you can optionally add a method).

Basic Version Implementation (C++14)

Consider variableswhich will be used in our class:

// очередь задач - хранит функцию(задачу), которую нужно исполнить и номер задачи
std::queue<std::pair<std::future<void>, int64_t>> q; 

std::mutex q_mtx;
std::condition_variable q_cv;

// помещаем в данный контейнер исполненные задачи
std::unordered_set<int64_t> completed_task_ids; 

std::condition_variable completed_task_ids_cv;
std::mutex completed_task_ids_mtx;

std::vector<std::thread> threads;

// флаг завершения работы thread_pool
std::atomic<bool> quite{ false };

// переменная хранящая id который будет выдан следующей задаче
std::atomic<int64_t> last_idx = 0;

queue stores std::future<void> – an object that will return a void type in the future, using std::future allows you not to immediately calculate the function, but to postpone the call until the moment we need, you can also use std::function<void()> (this method is also acceptable).

thread_pool(uint32_t num_threads) {
    threads.reserve(num_threads);
    for (uint32_t i = 0; i < num_threads; ++i) {
        threads.emplace_back(&thread_pool::run, this);
    }
}

In the constructor, we create the specified number of threads and each of the threads runs a single private method run.

void run() {
    while (!quite) {
        std::unique_lock<std::mutex> lock(q_mtx);
        
        // если есть задачи, то берём задачу, иначе - засыпаем
        // если мы зашли в деструктор, то quite будет true и мы не будем 
        // ждать завершения всех задач и выйдем из цикла
        q_cv.wait(lock, [this]()->bool { return !q.empty() || quite; });

        if (!q.empty()) {
            auto elem = std::move(q.front());
            q.pop();
            lock.unlock();

			// вычисляем объект типа std::future (вычисляем функцию) 
            elem.first.get();

            std::lock_guard<std::mutex> lock(completed_task_ids_mtx);
            
            // добавляем номер выполненой задачи в список завершённых
            completed_task_ids.insert(elem.second);

            // делаем notify, чтобы разбудить потоки
            completed_task_ids_cv.notify_all();
        }
    }
}

condition_variable on the method wait (q_cv) captures the mutex, checks the condition, if the condition is true, then we go further through the code, otherwise we fall asleep, release the mutex and wait for the call notify from the add tasks method (when the notify the procedure is repeated – we capture the mutex and check the condition). Thus, we take tasks until they run out, and when they run out and a new task comes, we wake up the thread.

template <typename Func, typename ...Args>
int64_t add_task(const Func& task_func, Args&&... args) {
    // получаем значение индекса для новой задачи
    int64_t task_idx = last_idx++;

    std::lock_guard<std::mutex> q_lock(q_mtx);
    q.emplace(std::async(std::launch::deferred, task_func, args...), task_idx);
    
    // делаем notify_one, чтобы проснулся один спящий поток (если такой есть)
    // в методе run
    q_cv.notify_one();
    return task_idx;
}

std::async(std::launch::deferred, task_func, args...) this function despite the name async does nothing asynchronously thanks to the parameter std::launch::deferred. We just memorize the arguments of the function, as is the case with std::bind . The only difference is bind does not require filling in all the arguments of the function, in contrast to std::async.

void wait(int64_t task_id) {
    std::unique_lock<std::mutex> lock(completed_task_ids_mtx);
    
    // ожидаем вызова notify в функции run (сработает после завершения задачи)
    completed_task_ids_cv.wait(lock, [this, task_id]()->bool {
        return completed_task_ids.find(task_id) != completed_task_ids.end(); 
    });
}

void wait_all() {
    std::unique_lock<std::mutex> lock(q_mtx);
    
    // ожидаем вызова notify в функции run (сработает после завершения задачи)
    completed_task_ids_cv.wait(lock, [this]()->bool {
        std::lock_guard<std::mutex> task_lock(completed_task_ids_mtx);
        return q.empty() && last_idx == completed_task_ids.size();
    });
}

note that wait_all inside wait uses another lock on the queue to check for emptyness (we must lock each shared resource to avoid data race).

Also note that std::lock_guard stands where it doesn’t wait for the mutex and don’t need to do unlock (std::unique_lock in other cases). If you stick to this rule, programmers looking at your code will thank you.

bool calculated(int64_t task_id) {
    std::lock_guard<std::mutex> lock(completed_task_ids_mtx);
    if (completed_task_ids.find(task_id) != completed_task_ids.end()) {
        return true;
    }
    return false;
}

Non-blocking method checking the task for completion returns true if the task with the data task_id already calculated, otherwise false.

~thread_pool() {
    // можно добавить wait_all() если нужно дождаться всех задачь перед удалением
    quite = true;
    for (uint32_t i = 0; i < threads.size(); ++i) {
        q_cv.notify_all();
        threads[i].join();
    }
}

If an instance of a class thread_pool is deleted, then we wait for the completion of all threads in the destructor. At the same time, if there are more tasks in the queue, then each thread will execute one more task and complete the work (this behavior can be changed and, for example, wait for all tasks to complete before completing).

You can see the full code for this implementation. HERE.

An example of working with Thread Pool

void sum(int& ans, std::vector<int>& arr) {
    for (int i = 0; i < arr.size(); ++i) {
        ans += arr[i];
    }
}

int main() {
    thread_pool tp(3);
    std::vector<int> s1 = { 1, 2, 3 };
    int ans1 = 0;
    
    std::vector<int> s2 = { 4, 5 };
    int ans2 = 0;
    
    std::vector<int> s3 = { 8, 9, 10 };
    int ans3 = 0;
		
    // добавляем в thread_pool выполняться 3 задачи
    auto id1 = tp.add_task(sum, std::ref(ans1), std::ref(s1));
    auto id2 = tp.add_task(sum, std::ref(ans2), std::ref(s2));
    auto id3 = tp.add_task(sum, std::ref(ans3), std::ref(s3));

    if (tp.calculated(id1)) {
        // если результат уже посчитан, то просто выводим ответ
        std::cout << ans1 << std::endl;
    }
    else {
        // если результат ещё не готов, то ждём его
        tp.wait(id1);
        std::cout << ans1 << std::endl;
    }
    tp.wait_all();

    std::cout << ans2 << std::endl;
    std::cout << ans3 << std::endl;
    return 0;
}

Worth paying attention to std::ref thanks to it, a reference will be passed, and not a copy of the object (this is a feature of passing an argument to std::future).

Here is a fairly simple example of working with Thread Pool. Let’s take a look at this little code snippet and see what we can improve.

No.

Flaw

Consequences

one

The function must be void

You will have to change the signature of the function if it returned some value

2

You have to store an additional variable for the answer

If we need several values ​​from the thread_pool, then we will have to carry all these variables with us. What if we need 100 values ​​or more… ?

Unfortunately, I was not able to solve these problems by means of C++ 14but C++ 17 allowed to get rid of the above shortcomings.

Improving Thread Pool with C++17

To improve our version, you first need to understand what the main problem was, and the problem was to find out the return type of the function and at the same time be able to put the calculation result (maybe a different return type) into 1 object, and here comes to the rescue std::any.

Now we can store in our queue std::function<std::any()> (record std::future<std::any>not valid). This is exactly what I did on my first try and got very beautiful codewhich did not differ much from the original implementation, but then I ran into a problem that std::any cannot be type void . Then I decided to create a class Taskwhich could store in one case std::function<std::any()> and in another std::function<void()>. Consider its constructor:

template <typename FuncRetType, typename ...Args, typename ...FuncTypes>
Task(FuncRetType(*func)(FuncTypes...), Args&&... args) :
    is_void{ std::is_void_v<FuncRetType> } {

    if constexpr (std::is_void_v<FuncRetType>) {
        void_func = std::bind(func, args...);
        any_func = []()->int { return 0; };
    }
    else {
        void_func = []()->void {};
        any_func = std::bind(func, args...);
    }
}

We use if constexpr to compile only one condition branch. If we use the usual ifthen when receiving a function that returns void the compiler will try to convert void in std::any and thus we will get a type conversion error, despite the fact that this cast will occur in a different branch of the condition.

We use typename ...Args And typename ...FuncTypesso that implicit casting between std::referense_wrapper and a reference type, then we don’t have to explicitly write in the signature in the functions std::referense_wrapper.

any_func = []()->int { return 0; }; And void_func = []()->void {}; stub functions. They allow you to get rid of an extra condition when calculating a value:

void operator() () {
    void_func();
    any_func_result = any_func();
}

has_result checks whether the function will return a value or not, and get_result will get it.

bool has_result() {
    return !is_void;
}

std::any get_result() const {
    assert(!is_void);
    assert(any_func_result.has_value());
    return any_func_result;
}

Another helper class: TaskInfo:

enum class TaskStatus {
    in_q,
    completed
};

struct TaskInfo {
    TaskStatus status = TaskStatus::in_q;
    std::any result;
};

This structure stores information about the task: status and possible result. If the structure will return voidthen the field result will remain empty.

Consider private class fields thread_pool

std::vector<std::thread> threads;

// очередь с парой задача, номер задачи
std::queue<std::pair<Task, uint64_t>> q;

std::mutex q_mtx;
std::condition_variable q_cv;

// Будем создавать ключ как только пришла новая задача и изменять её статус
// при завершении
std::unordered_map<uint64_t, TaskInfo> tasks_info;

std::condition_variable tasks_info_cv;
std::mutex tasks_info_mtx;

std::condition_variable wait_all_cv;

std::atomic<bool> quite{ false };
std::atomic<uint64_t> last_idx{ 0 };

// переменная считающая кол-во выполненых задач
std::atomic<uint64_t> cnt_completed_tasks{ 0 };

Unlike the previous implementation, we need a variable cnt_completed_tasks (in the previous implementation, we had a separate container for completed tasks and we got the number of completed tasks by the size of this container), to count the number of completed tasks. This variable will be used in the function wait_all to determine that all tasks have completed.

We will also separately consider 3 different functions for waiting for the result:

void wait(const uint64_t task_id) {
    std::unique_lock<std::mutex> lock(tasks_info_mtx);
    tasks_info_cv.wait(lock, [this, task_id]()->bool {
        return task_id < last_idx&& tasks_info[task_id].status == TaskStatus::completed;
    });
}

std::any wait_result(const uint64_t task_id) {
    wait(task_id);
    return tasks_info[task_id].result;
}

template<class T>
void wait_result(const uint64_t task_id, T& value) {
    wait(task_id);
    value = std::any_cast<T>(tasks_info[task_id].result);
}
  • void wait(const uint64_t task_id) – used to wait for a task that returns void.

  • std::any wait_result(const uint64_t task_id) And void wait_result(const uint64_t task_id, T& value) return results in different ways.

std::any wait_result(const uint64_t task_id) will return std::any and the user will have to do cast to the correct type. template function void wait_result(const uint64_t task_id, T& value) takes as the second argument a reference to a variable, where the new value will be placed and an explicit cast the user will not have to do.

The rest of the code is very similar to the previous version and you can find the code for the new version HERE.

Using Thread Pool (C++17)

int int_sum(int a, int b) {
    return a + b;
}

void void_sum(int& c, int a, int b) {
    c = a + b;
}

void void_without_argument() {
    std::cout << "It's OK!" << std::endl;
}

int main() {
    thread_pool t(3);
    int c;
    t.add_task(int_sum, 2, 3);               // id = 0
    t.add_task(void_sum, std::ref(c), 4, 6); // id = 1
    t.add_task(void_without_argument);       // id = 2

    {
        // variant 1
        int res;
        t.wait_result(0, res);
        std::cout << res << std::endl;

        // variant 2
        std::cout << std::any_cast<int>(t.wait_result(0)) << std::endl;
    }

    t.wait(1);
    std::cout << c << std::endl;

    t.wait_all(); // waiting for task with id 2

    return 0;
}

In this example, 2 ways to get a value through a function are considered wait_result. I personally prefer option 2. Despite the fact that you need to cast, you get a compact solution, and you can also catch and work out an exception in case of an error.

Have we really got a version better than the previous one?

Yes and no. After analysis, I got the following results:

Type of argument to be passed when creating a new task

thread_pool c++14

thread_pool c++17

return function void

+

+

function that returns everything except void

+

std::bind

+

functor

+

Functor example and std::bind:

class Test {
public:
    void operator() () {
        std::cout << "Working with functors!\n";
    }
};

void sum(int a, int b) {
    std::cout << a + b << std::endl;
}

int main() {
    Test test;
    auto res = std::bind(sum, 2, 3);

    thread_pool t(3); // C++ 14
    t.add_task(test);
    t.add_task(res);
    t.wait_all();

    return 0;
}

Why didn’t it work out better?

Initially, it was conceived to implement a Thread Pool, which itself will be able to determine the type of the returned value and, based on this type, form an object Taskbut the return type std::bind cannot be explicitly obtained through std::invoke_resultso I had to make some concessions.

Outcome

We got 2 different versions thred_pool. It’s hard to say which one is better. I personally prefer the version with C++ 17. It allows you not to carry many variables with you as links to the result, but stores everything inside yourself. Yes, this version is inferior in functionality, but the use of functors and std::bind not a frequent practice, so this is the option I think is the best.

Similar Posts

Leave a Reply