How to build a WaitGroup based on a 32-bit integer

image

Go has a nice synchronization utility called WaitGroup, which one or more goroutines can expect. This is required for concurrent completion of tasks. Other languages ​​generally use the following convention for completing tasks: unite threads doing work. In Go, goroutines have neither values ​​nor handles, so WaitGroup is used instead of unions. Assembling a WaitGroup based on typical portable primitives is a messy business involving constructors and destructors; also in the course of this work it is necessary to manage the times of life. However, at least under Linux and under Windows, it is possible to construct a WaitGroup from an integer (initialized with a value), much like creating 32 bit queue and 32-bit barrier.

In case you are not familiar with the typical use cases for WaitGroup in Go, they are:

var wg sync.WaitGroup
for _, task := range tasks {
    wg.Add(1)
    go func(t Task) {
        // ... выполняем задачу ...
        wg.Done()
    }(task)
}
wg.Wait()

I initialize the WaitGroup with a value of zero, and the main goroutine increments the counter by one before the start of each next goroutine that solves the problem. As soon as such a goroutine copes with its task, it will decrease the counter by one, while the main goroutine will wait until its counter reaches zero. I was aiming to implement the same mechanism in C:

void workfunc(task t, int *wg)
{
    // ... выполняем задачу ...
    waitgroup_done(wg);
}


int main(void)
{
    // ...
    int wg = 0;
    for (int i = 0; i < ntasks; i++) {
        waitgroup_add(&wg, 1);
        go(workfunc, tasks[i], &wg);
    }
    waitgroup_wait(&wg);
    // ...
}

Once it’s done, the WaitGroup returns to zero, and no cleanup is required.

Next, I’m going to expand on this example a bit. Because the WaitGroup value and context are explicit, you can initialize a WaitGroup with any non-negative number of tasks! In other words, waitgroup_add optional if the total number of tasks is known in advance.


    int wg = ntasks;
    for (int i = 0; i < ntasks; i++) {
        go(workfunc, tasks[i], &wg);
    }
    waitgroup_wait(&wg);

If you want to see the full code, see:

waitgroup.c

Four elements (timing)

To build such a WaitGroup, we need four primitives on the host platform, and each of these primitives will operate

int

. The first two represent atomic operations, and the second two interact with the system scheduler. To port WaitGroup to the platform, all you need to do is implement these four functions; as a rule, each of them fits into one line.

static int load(int *); // атомарная загрузка
static int addfetch(int *, int); // атомарное сложение с последующей выборкой
static void wait(int *, int); // дожидаться изменения по адресу
static void wake(int *); // разбудить всех ожидающих по адресу

The first two functions should be self-evident. Function

wait

expects a pointer to an integer to change its value, and its second argument is the expected actual value. The scheduler will double check the integer before sending the thread to sleep in case the value changes at the last moment. In other words, we are dealing with an atomic “check-and-then-maybe-sleep” operation. Function

wake

responsible for the second half of the work. The thread, after changing the integer, using this function, wakes up all other threads that were waiting for the change in the number to which the pointer was directed. In general, this whole mechanism is known as

futex

.

I’m going to simplify the WaitGroup semantics a bit to make my implementation even easier. WaitGroup in Go allows negative numbers to be added, and the method Addessentially does double duty. In my version, addition of negative numbers is prohibited. So the “add” operation is just an atomic increase by one:

void waitgroup_add(int *wg, int delta)
{
    addfetch(wg, delta);
}

Since the counter here cannot be brought to zero, nothing more needs to be done. But the “done” operation

maybe

decrement the counter to zero:

void waitgroup_done(int *wg)
{
    if (!addfetch(wg, -1)) {
        wake(wg);
    }
}

If the count was brought to zero during the atomic reduction, this means that we have completed work on the last task – that is, it is necessary to wake up those waiting. We don’t know if there are actually any waiting, but that’s okay. Some use cases for futexes can avoid a relatively expensive system call if there are no pending calls – that is, not waste time making a system call for every uncontested mutex unlock. But in a typical use case for a WaitGroup, we

we expect

that the waiter is found when the count finally reaches zero. This is a common case.

The most complex of the three operations is waiting:

void waitgroup_wait(int *wg)
{
    for (;;) {
        int c = load(wg);
        if (!c) {
            break;
        }
    wait(wg, c);
}
}

First, we check if the count is already equal to zero, and if it is, we return. Otherwise, we use a futex to

wait for the account to change

. Unfortunately, what we are aiming for is not exactly this semantics, but the ability to expect a certain target value. This does not violate expectations, but it may potentially be ineffective. If the thread succeeds between our load and wait operations, we won’t send it to sleep, but give it a second try. But in practice, I competitively ran thousands of threads through this thing and never met such a “miss”. As far as I can tell, such a case is so rare that it can be ignored.

If such a moment really bothered us, then WaitGroup could be made from a pair of integers: a counter and a latch, which can be equal to 0 or 1. Waiters wait for the latch, and the latch itself is (atomically) updated when the counter reaches zero or shifts from scratch. So the waiters get a stable value to wait on, and the latch mediates the counter. But since this doesn’t seem to matter in practice, I prefer the elegance and simplicity of a WaitGroup of a single integer.

Four elements: Linux

When the WaitGroup is done in a general way, we need to provide for those parts of it that depend on specific platforms. Both GCC and Clang support

GNU-style atomics

, so I won’t worry about the compiler, but just assume they’re available under Linux. The first two functions wrap these inline elements:

static int load(int *p)
{
    return __atomic_load_n(p, __ATOMIC_SEQ_CST);
}


static int addfetch(int *p, int addend)
{
    return __atomic_add_fetch(p, addend, __ATOMIC_SEQ_CST);
}

For

wait

and

wake

we need

futex(2) system call

. In an attempt to show that you don’t want to use them directly, glibc doesn’t wrap this system call in a function, so we have to make the system call ourselves.

static void wait(int *p, int current)
{
    syscall(SYS_futex, p, FUTEX_WAIT, current, 0, 0, 0);
}


static void wake(int *p)
{
    syscall(SYS_futex, p, FUTEX_WAKE, INT_MAX, 0, 0, 0);
}

INT_MAX

means “to wake up as many as you can.” Another common value is 1, corresponding to “wake up the only one waiting.” Also, these system calls cannot fail informatively, so the return value does not need to be checked either. If

wait

Wake up early (ex.

EINTR

), then it will still recheck the counter. In fact, if your kernel is older than 20 years, and it appeared long before futexes, then it will still return

ENOSYS

(“function not implemented”). In that case, the function

doesn’t matter

will work correctly, albeit extremely inefficiently.

Four elements: Windows

Windows did not support futexes until Windows 8, released in 2012. In 2020, versions of Windows without futexes were still supported, so futexes are still relatively new on this platform. However, they are already mature enough that we can consider them ready to use.

I would like it to be supported as GCC compilers (via mingw-w64), and MSVC-shnye. mingw-w64 provides compatible intrin.h, so I can keep working with MSVC atomics and cover both at once. On the other hand, MSVC does not require that atomics for int (or even int32_t) were strictly long, so I can wave a little cast here. (remember: sizeof(long) == sizeof(int) on all versions of Windows that support futexes.) Another option is to use the WaitGroup definition typedefso that it turns out int under Linux (for futexes) and long under Windows (for atomics).

static int load(int *p)
{
    return _InterlockedOr((long *)p, 0);
}


static int addfetch(int *p, int addend)
{
    return addend + _InterlockedExchangeAdd((long *)p, addend);
}

Officially allowed to use futex functions –

WaitOnAddress

and

WakeByAddressAll

. They already

were

in

kernel32.dll

but at the time of the original writing of this article, they live in

API-MS-Win-Core-Synch-l1-2-0.dll

linked via -lsynchronization. Dirty. Because I can’t digest it, I instead call the low-level RTL functions right where they are implemented: RtlWaitOnAddress and RtlWakeAddressAll. They live right next door to

ntdll.dll

which is very convenient. As far as I know, they are not documented, but, fortunately, we

Wine comes to the rescue

, which provides not only documentation on them, but also several implementation options. Reading this material is informative; it suggests how to construct futexes in systems where they do not yet exist.

These functions are not declared in any headers, so I had to take care of them myself. The good news here is that so far I’ve been able to get by without spending a lot of money on including windows.hso I can go on without it. These functions are still listed in the import library ntdll.dllso I don’t have to to invent records of such a library.

__declspec(dllimport)
long __stdcall RtlWaitOnAddress(void *, void *, size_t, void *);
__declspec(dllimport)
long __stdcall RtlWakeAddressAll(void *);

Conveniently enough, this semantics matches the semantics of Linux futexes perfectly!

static void wait(int *p, int current)
{
    RtlWaitOnAddress(p, &current, sizeof(*p), 0);
}


static void wake(int *p)
{
    RtlWakeAddressAll(p);
}

As in the case of Linux, there is no informative failure here, so the return values ​​\u200b\u200bare not of interest to us.

That’s the whole implementation. If you rely on just one platform, you get a flexible, lightweight and easy-to-use synchronization tool. It’s quite impressive in my opinion, considering that it was possible to implement it in about 50 lines of relatively simple code!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *