Using sync

Hello to all gophers! In this article I want to analyze how you can use the module sync/atomic for type float64.

Formulation of the problem

We have a channel from which we can read the results of the tasks. The result of the task contains a flag indicating whether there was an error during the execution of the task and the result of this task (float64 type). You need to find the sum of all the results and the number of errors.

Implementation using sync.Mutex

The problem can be solved using sync.Mutex:

package main

import (
	"fmt"
	"math"
	"sync"
	"sync/atomic"
)

const countWorkers = 1000
const countTasks = 10000000

type Result struct {
	value    float64
	hasError bool
}

func MakeTasks(count int) <-chan Result {
	ch := make(chan Result)
	go func() {
		for i := 0; i < count; i++ {
			ch <- Result{
				value:    float64(i) * 2.42,
				hasError: (i % 10) == 0,
			}

		}
		close(ch)
	}()
	return ch
}

func ProcessUsingMutex(ch <-chan Result, countWorkers int) (float64, int64) {
	var wg sync.WaitGroup
	var errMu sync.Mutex
	var mu sync.Mutex
	var countErrors int64
	var result float64

	for i := 0; i < countWorkers; i++ {
		wg.Add(1)
		go func() {
			for item := range ch {
				if item.hasError {
					errMu.Lock()
					countErrors++
					errMu.Unlock()
				} else {
					mu.Lock()
					result += item.value
					mu.Unlock()
				}
			}
			wg.Done()
		}()
	}
	wg.Wait()
	return result, countErrors
}


func main() {
	ch := MakeTasks(countTasks)
	fmt.Println(ProcessUsingMutex(ch, countWorkers))
}

Implementation using sync/atomic

But use sync.Mutex can slow down our program. Therefore, the solution can be rewritten using sync/atomic. For the number of errors, this is quite easy to do. Instead of using sync.Mutex use atomic.AddInt64that is:

errMu.Lock()
countErrors++
errMu.Unlock()

replace with

atomic.AddInt64(&countErrors, 1)

But there is no function for float64 AddFloat64. But it can be implemented. In order to better understand how this can be done, we write a function AddInt64 using CompareAndSwapInt64

func AddInt64(addr *int64, delta int64) (new int64) {
	for {
		if v := *addr; atomic.CompareAndSwapInt64(addr, v, v+delta) {
			return v
		}
	}
}

But for float64 no and CompareAndSwapbut we can convert the value float64 in uint64 using math.Float64bits and operate uint64.

Implementation AddFloat64 could be like this:

func AddFloat64(addr *uint64, delta float64) uint64 {
	for {
		cur := atomic.LoadUint64(addr)
		curVal := math.Float64frombits(cur)
		nxtVal := curVal + delta
		nxt := math.Float64bits(nxtVal)
		if atomic.CompareAndSwapUint64(addr, cur, nxt) {
			return nxt
		}
	}
}

After we have implemented AddFloat64 we can completely rewrite our function without using mutexes:

func ProcessUsingAtomic(ch <-chan Result, countWorkers int) (float64, int64) {
	var wg sync.WaitGroup
	var countErrors int64
	var total uint64
	for i := 0; i < countWorkers; i++ {
		wg.Add(1)
		go func() {
			for item := range ch {
				if item.hasError {
					atomic.AddInt64(&countErrors, 1)
				} else {
					AddFloat64(&total, item.value)
				}
			}
			wg.Done()
		}()
	}
	wg.Wait()
	return math.Float64frombits(atomic.LoadUint64(&total)), atomic.LoadInt64(&countErrors)
}

To compare mutex and atomic, I wrote a small benchmark

package main

import (
	"testing"
)

func BenchmarkProcessUsingAtomic(b *testing.B) {

	for i := 0; i < b.N; i++ {
		b.StopTimer()
		ch := MakeTasks(countTasks)
		b.StartTimer()
		ProcessUsingAtomic(ch, countWorkers)
	}
}

func BenchmarkProcessUsingMutex(b *testing.B) {
	for i := 0; i < b.N; i++ {
		b.StopTimer()
		ch := MakeTasks(countTasks)
		b.StartTimer()
		ProcessUsingMutex(ch, countWorkers)
	}
}

Running this benchmark shows that atomic is 5-7% faster.

BenchmarkProcessUsingAtomic-8 1 2468118459 ns/op
BenchmarkProcessUsingMutex-8 1 2640532917 ns/op
PASS

Conclusion

Module sync/atomic can be useful if used instead of mutexes, as atomics are slightly faster than mutexes. If you want to use atomics for types that are not supported in sync/atomic you can try to use standard functions to implement the desired functionality, as I described in this article, or use the structure Value from module sync/atomic.

Similar Posts

Leave a Reply