How to build a WaitGroup based on a 32-bit integer

Go has a nice synchronization utility called WaitGroup, which one or more goroutines can expect. This is required for concurrent completion of tasks. Other languages generally use the following convention for completing tasks: unite threads doing work. In Go, goroutines have neither values nor handles, so WaitGroup is used instead of unions. Assembling a WaitGroup based on typical portable primitives is a messy business involving constructors and destructors; also in the course of this work it is necessary to manage the times of life. However, at least under Linux and under Windows, it is possible to construct a WaitGroup from an integer (initialized with a value), much like creating 32 bit queue and 32-bit barrier.
In case you are not familiar with the typical use cases for WaitGroup in Go, they are:
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
go func(t Task) {
// ... выполняем задачу ...
wg.Done()
}(task)
}
wg.Wait()
I initialize the WaitGroup with a value of zero, and the main goroutine increments the counter by one before the start of each next goroutine that solves the problem. As soon as such a goroutine copes with its task, it will decrease the counter by one, while the main goroutine will wait until its counter reaches zero. I was aiming to implement the same mechanism in C:
void workfunc(task t, int *wg)
{
// ... выполняем задачу ...
waitgroup_done(wg);
}
int main(void)
{
// ...
int wg = 0;
for (int i = 0; i < ntasks; i++) {
waitgroup_add(&wg, 1);
go(workfunc, tasks[i], &wg);
}
waitgroup_wait(&wg);
// ...
}
Once it’s done, the WaitGroup returns to zero, and no cleanup is required.
Next, I’m going to expand on this example a bit. Because the WaitGroup value and context are explicit, you can initialize a WaitGroup with any non-negative number of tasks! In other words, waitgroup_add
optional if the total number of tasks is known in advance.
int wg = ntasks;
for (int i = 0; i < ntasks; i++) {
go(workfunc, tasks[i], &wg);
}
waitgroup_wait(&wg);
If you want to see the full code, see:
❯ Four elements (timing)
To build such a WaitGroup, we need four primitives on the host platform, and each of these primitives will operate
int
. The first two represent atomic operations, and the second two interact with the system scheduler. To port WaitGroup to the platform, all you need to do is implement these four functions; as a rule, each of them fits into one line.
static int load(int *); // атомарная загрузка
static int addfetch(int *, int); // атомарное сложение с последующей выборкой
static void wait(int *, int); // дожидаться изменения по адресу
static void wake(int *); // разбудить всех ожидающих по адресу
The first two functions should be self-evident. Function
wait
expects a pointer to an integer to change its value, and its second argument is the expected actual value. The scheduler will double check the integer before sending the thread to sleep in case the value changes at the last moment. In other words, we are dealing with an atomic “check-and-then-maybe-sleep” operation. Function
wake
responsible for the second half of the work. The thread, after changing the integer, using this function, wakes up all other threads that were waiting for the change in the number to which the pointer was directed. In general, this whole mechanism is known as
futex
.
I’m going to simplify the WaitGroup semantics a bit to make my implementation even easier. WaitGroup in Go allows negative numbers to be added, and the method Add
essentially does double duty. In my version, addition of negative numbers is prohibited. So the “add” operation is just an atomic increase by one:
void waitgroup_add(int *wg, int delta)
{
addfetch(wg, delta);
}
Since the counter here cannot be brought to zero, nothing more needs to be done. But the “done” operation
maybe
decrement the counter to zero:
void waitgroup_done(int *wg)
{
if (!addfetch(wg, -1)) {
wake(wg);
}
}
If the count was brought to zero during the atomic reduction, this means that we have completed work on the last task – that is, it is necessary to wake up those waiting. We don’t know if there are actually any waiting, but that’s okay. Some use cases for futexes can avoid a relatively expensive system call if there are no pending calls – that is, not waste time making a system call for every uncontested mutex unlock. But in a typical use case for a WaitGroup, we
we expect
that the waiter is found when the count finally reaches zero. This is a common case.
The most complex of the three operations is waiting:
void waitgroup_wait(int *wg)
{
for (;;) {
int c = load(wg);
if (!c) {
break;
}
wait(wg, c);
}
}
First, we check if the count is already equal to zero, and if it is, we return. Otherwise, we use a futex to
wait for the account to change
. Unfortunately, what we are aiming for is not exactly this semantics, but the ability to expect a certain target value. This does not violate expectations, but it may potentially be ineffective. If the thread succeeds between our load and wait operations, we won’t send it to sleep, but give it a second try. But in practice, I competitively ran thousands of threads through this thing and never met such a “miss”. As far as I can tell, such a case is so rare that it can be ignored.
If such a moment really bothered us, then WaitGroup could be made from a pair of integers: a counter and a latch, which can be equal to 0 or 1. Waiters wait for the latch, and the latch itself is (atomically) updated when the counter reaches zero or shifts from scratch. So the waiters get a stable value to wait on, and the latch mediates the counter. But since this doesn’t seem to matter in practice, I prefer the elegance and simplicity of a WaitGroup of a single integer.
❯ Four elements: Linux
When the WaitGroup is done in a general way, we need to provide for those parts of it that depend on specific platforms. Both GCC and Clang support
, so I won’t worry about the compiler, but just assume they’re available under Linux. The first two functions wrap these inline elements:
static int load(int *p)
{
return __atomic_load_n(p, __ATOMIC_SEQ_CST);
}
static int addfetch(int *p, int addend)
{
return __atomic_add_fetch(p, addend, __ATOMIC_SEQ_CST);
}
For
wait
and
wake
we need
. In an attempt to show that you don’t want to use them directly, glibc doesn’t wrap this system call in a function, so we have to make the system call ourselves.
static void wait(int *p, int current)
{
syscall(SYS_futex, p, FUTEX_WAIT, current, 0, 0, 0);
}
static void wake(int *p)
{
syscall(SYS_futex, p, FUTEX_WAKE, INT_MAX, 0, 0, 0);
}
INT_MAX
means “to wake up as many as you can.” Another common value is 1, corresponding to “wake up the only one waiting.” Also, these system calls cannot fail informatively, so the return value does not need to be checked either. If
wait
Wake up early (ex.
EINTR
), then it will still recheck the counter. In fact, if your kernel is older than 20 years, and it appeared long before futexes, then it will still return
ENOSYS
(“function not implemented”). In that case, the function
doesn’t matter
will work correctly, albeit extremely inefficiently.
❯ Four elements: Windows
Windows did not support futexes until Windows 8, released in 2012. In 2020, versions of Windows without futexes were still supported, so futexes are still relatively new on this platform. However, they are already mature enough that we can consider them ready to use.
I would like it to be supported as GCC compilers (via mingw-w64), and MSVC-shnye. mingw-w64 provides compatible intrin.h
, so I can keep working with MSVC atomics and cover both at once. On the other hand, MSVC does not require that atomics for int
(or even int32_t
) were strictly long
, so I can wave a little cast here. (remember: sizeof(long) == sizeof(int)
on all versions of Windows that support futexes.) Another option is to use the WaitGroup definition typedef
so that it turns out int
under Linux (for futexes) and long
under Windows (for atomics).
static int load(int *p)
{
return _InterlockedOr((long *)p, 0);
}
static int addfetch(int *p, int addend)
{
return addend + _InterlockedExchangeAdd((long *)p, addend);
}
Officially allowed to use futex functions –
and
. They already
in
kernel32.dll
but at the time of the original writing of this article, they live in
API-MS-Win-Core-Synch-l1-2-0.dll
linked via -lsynchronization. Dirty. Because I can’t digest it, I instead call the low-level RTL functions right where they are implemented: RtlWaitOnAddress and RtlWakeAddressAll. They live right next door to
ntdll.dll
which is very convenient. As far as I know, they are not documented, but, fortunately, we
, which provides not only documentation on them, but also several implementation options. Reading this material is informative; it suggests how to construct futexes in systems where they do not yet exist.
These functions are not declared in any headers, so I had to take care of them myself. The good news here is that so far I’ve been able to get by without spending a lot of money on including windows.hso I can go on without it. These functions are still listed in the import library ntdll.dll
so I don’t have to to invent records of such a library.
__declspec(dllimport)
long __stdcall RtlWaitOnAddress(void *, void *, size_t, void *);
__declspec(dllimport)
long __stdcall RtlWakeAddressAll(void *);
Conveniently enough, this semantics matches the semantics of Linux futexes perfectly!
static void wait(int *p, int current)
{
RtlWaitOnAddress(p, ¤t, sizeof(*p), 0);
}
static void wake(int *p)
{
RtlWakeAddressAll(p);
}
As in the case of Linux, there is no informative failure here, so the return values \u200b\u200bare not of interest to us.
That’s the whole implementation. If you rely on just one platform, you get a flexible, lightweight and easy-to-use synchronization tool. It’s quite impressive in my opinion, considering that it was possible to implement it in about 50 lines of relatively simple code!