Using sync
Hello to all gophers! In this article I want to analyze how you can use the module sync/atomic
for type float64.
Formulation of the problem
We have a channel from which we can read the results of the tasks. The result of the task contains a flag indicating whether there was an error during the execution of the task and the result of this task (float64 type). You need to find the sum of all the results and the number of errors.
Implementation using sync.Mutex
The problem can be solved using sync.Mutex
:
package main
import (
"fmt"
"math"
"sync"
"sync/atomic"
)
const countWorkers = 1000
const countTasks = 10000000
type Result struct {
value float64
hasError bool
}
func MakeTasks(count int) <-chan Result {
ch := make(chan Result)
go func() {
for i := 0; i < count; i++ {
ch <- Result{
value: float64(i) * 2.42,
hasError: (i % 10) == 0,
}
}
close(ch)
}()
return ch
}
func ProcessUsingMutex(ch <-chan Result, countWorkers int) (float64, int64) {
var wg sync.WaitGroup
var errMu sync.Mutex
var mu sync.Mutex
var countErrors int64
var result float64
for i := 0; i < countWorkers; i++ {
wg.Add(1)
go func() {
for item := range ch {
if item.hasError {
errMu.Lock()
countErrors++
errMu.Unlock()
} else {
mu.Lock()
result += item.value
mu.Unlock()
}
}
wg.Done()
}()
}
wg.Wait()
return result, countErrors
}
func main() {
ch := MakeTasks(countTasks)
fmt.Println(ProcessUsingMutex(ch, countWorkers))
}
Implementation using sync/atomic
But use sync.Mutex
can slow down our program. Therefore, the solution can be rewritten using sync/atomic
. For the number of errors, this is quite easy to do. Instead of using sync.Mutex
use atomic.AddInt64
that is:
errMu.Lock()
countErrors++
errMu.Unlock()
replace with
atomic.AddInt64(&countErrors, 1)
But there is no function for float64 AddFloat64
. But it can be implemented. In order to better understand how this can be done, we write a function AddInt64
using CompareAndSwapInt64
func AddInt64(addr *int64, delta int64) (new int64) {
for {
if v := *addr; atomic.CompareAndSwapInt64(addr, v, v+delta) {
return v
}
}
}
But for float64
no and CompareAndSwap
but we can convert the value float64
in uint64
using math.Float64bits
and operate uint64
.
Implementation AddFloat64
could be like this:
func AddFloat64(addr *uint64, delta float64) uint64 {
for {
cur := atomic.LoadUint64(addr)
curVal := math.Float64frombits(cur)
nxtVal := curVal + delta
nxt := math.Float64bits(nxtVal)
if atomic.CompareAndSwapUint64(addr, cur, nxt) {
return nxt
}
}
}
After we have implemented AddFloat64
we can completely rewrite our function without using mutexes:
func ProcessUsingAtomic(ch <-chan Result, countWorkers int) (float64, int64) {
var wg sync.WaitGroup
var countErrors int64
var total uint64
for i := 0; i < countWorkers; i++ {
wg.Add(1)
go func() {
for item := range ch {
if item.hasError {
atomic.AddInt64(&countErrors, 1)
} else {
AddFloat64(&total, item.value)
}
}
wg.Done()
}()
}
wg.Wait()
return math.Float64frombits(atomic.LoadUint64(&total)), atomic.LoadInt64(&countErrors)
}
To compare mutex and atomic, I wrote a small benchmark
package main
import (
"testing"
)
func BenchmarkProcessUsingAtomic(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
ch := MakeTasks(countTasks)
b.StartTimer()
ProcessUsingAtomic(ch, countWorkers)
}
}
func BenchmarkProcessUsingMutex(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
ch := MakeTasks(countTasks)
b.StartTimer()
ProcessUsingMutex(ch, countWorkers)
}
}
Running this benchmark shows that atomic is 5-7% faster.
BenchmarkProcessUsingAtomic-8 1 2468118459 ns/op
BenchmarkProcessUsingMutex-8 1 2640532917 ns/op
PASS
Conclusion
Module sync/atomic
can be useful if used instead of mutexes, as atomics are slightly faster than mutexes. If you want to use atomics for types that are not supported in sync/atomic
you can try to use standard functions to implement the desired functionality, as I described in this article, or use the structure Value
from module sync/atomic
.