Diving into Concurrency in Go

I wanted to delve deeper into the topic of goroutines with parallelism, and while listening to “The Doors” in the background, I absorbed the information and highlighted interesting thoughts from the articles.

I hope you find what you were looking for.

Goroutines

Appearance

The main source of the appearance of goroutines and channels in Go was CSP. This modelь is based on communication of parallel programs through channels.

Explanation

I think the easiest way to explain goroutines is this: a goroutine is a lightweight wrapper around a thread.

Go uses the following terms: G (Goroutine) — Goroutine M (Machine) — Machine Each Machine runs in a separate thread and can only execute one Goroutine at a time. The scheduler of the operating system in which the program runs switches Machines. The number of running Machines is limited by an environment variable GOMAXPROCS or function runtime.GOMAXPROCS(n int). By default, it is equal to the number of processor cores of the computer on which the application was launched.

To run a function as a goroutine, you need to write go func() where func() is the function you want to run. Note that if you do this:

package main

import "fmt"

func main() {
  // цикл с 5-ю итерациями
  for i := 0; i < 5; i++ {
    // вызываем функцию greeting() как горутину
    go greeting()
 }
}

func greeting() {
  fmt.Println("Hello World!!!")
}

Go playground: https://go.dev/play/p/xdQFn8gEBFW

Nothing will happen, because the goroutine runs in parallel with “main()” and “main()” will finish before the goroutine finishes, and if the “main()” function finishes, the whole program finishes.

In order for the program to work correctly, it is necessary to wait for the goroutine to be executed. It is necessary to use WaitGroup, but more on that in the next chapter. For now, we'll do without it. time.Sleep(2 * time.Second) . The “main()” function will “sleep” for two seconds, thus waiting for “greeting()” to execute. Here is the corrected example:

package main

import (
	"fmt"
	"time"
)

func main() {
         // цикл с 5-ю итерациями
	for i := 0; i < 5; i++ {
                // вызываем функцию greeting() как горутину
		go greeting()
	}
        // ожидаем незавершившиеся горутины
	time.Sleep(2 * time.Second)
}

func greeting() {
	fmt.Println("Hello World!!!")
}

Go playground: https://go.dev/play/p/KRwf_oyd0c1

Package “sync”

WaitGroup(replacement of time.Sleep())

WaitGroup is a primitive synchronization used to wait for multiple goroutines to complete their execution. The “sync” package provides the “sync.WaitGroup” type and its “Add()”, “Done()” and “Wait()” methods. In theory, you use WaitGroup instead of writing time.Sleep(). It is more reliable, and you will stop waiting exactly when everything is done, not when “sleep()” ends. Let's look at a simple example with WaitGroup:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // создаем WaitGroup
    wg := &sync.WaitGroup{}
    // цикл с 5-ю итерациями
    for i := 0; i < 5; i++ {
        // добавляем в список ожидания одну горутину
        wg.Add(1)
        go func(i int) {
            // говорим, чтобы в конце анонимной функции одна горутина из списка ожидания исчезла
            defer wg.Done()
            // засыпаем, имитируя какую-то работу
            time.Sleep(time.Duration(i) * time.Second)
            fmt.Println("Горутина", i, "завершила свое выполнение")
        }(i)
    }
    // ожидаем незавершившиеся горутины
    wg.Wait()
    fmt.Println("Все горутины завершили свое выполнение!")
}

Go to playground: https://go.dev/play/p/XdsoK-FSPR_m

Here we used WaitGroup to wait for five goroutines. Before starting each of them, we used “wg.Add(1)” to increase the goroutine counter by 1. Inside the goroutines, we called “wg.Done()” (which subtracts 1 from the goroutine counter) when the goroutine finished executing. Finally, in “main()” we called “wg.Wait()” to wait for all goroutines to finish.

Mutex

Mutex (short for “mutual exclusion”) is a primitive synchronization used to protect data from being used by only one goroutine. In Go, the “sync” package provides the “sync.Mutex” type, which has two methods: “Lock()” and “Unlock()”.

Let's look at a simple example of using a mutex to protect a counter:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var (
      counter int // счетчик
      lock    sync.Mutex // наш mutex
    )
    
    // создаем WaitGroup
    wg := &sync.WaitGroup{}

    // цикл с 5-ю итерациями
    for i := 0; i < 5; i++ {
        // добавляем в список ожидания одну горутину
        wg.Add(1)
        go func() {
            // говорим, чтобы в конце анонимной функции одна горутина из списка ожидания исчезла
            defer wg.Done()
            // используем mutex для блокировки использования счетчика другими горутинами
            lock.Lock()
            // увеличиваем счетчик
            counter++
            fmt.Println("Счетчик:", counter)
            // разблокируем счетчик
            lock.Unlock()
        }()
    }
    // ожидаем незавершившиеся горутины
    wg.Wait()
    fmt.Println("Итоговый счетчик:", counter)
}

Go to playground: https://go.dev/play/p/eDm4HHbAGEB

In this example, we used “sync.Mutex” to protect the “counter” variable from being used. Before updating the counter, we call “lock.Lock()” to create a lock. Now no other goroutine can use “counter”. After updating the counter, we call “lock.Unlock()” to remove the lock. Once the counter is unlocked, anyone can use it again.

Atomic operations/atomic operations

Atomic operations are low-level synchronization primitives that provide a way to perform read-modify-write operations on shared variables without requiring locking (An atomic operation is an operation that is either completely completed or not completed at all; an operation that cannot be partially completed and partially not completed). The “sync/atomic” package provides several atomic operations for ints, pointers, and floats. Here is a simple example using atomic operations:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    // объявляем счетчик
    var counter int64

    // создаем WaitGroup
    wg := &sync.WaitGroup{}

    // цикл с 5-ю итерациями
    for i := 0; i < 5; i++ {
        // добавляем в список ожидания одну горутину
        wg.Add(1)
        go func() {
            // говорим, чтобы в конце анонимной функции одна горутина из списка исчезла
            defer wg.Done()
            // атомарной операцией добалвяем к счетчику единицу
            atomic.AddInt64(&counter, 1)
            fmt.Println("Счетчик:", atomic.LoadInt64(&counter))
        }()
    }
    // ожидаем незавершившиеся горутины
    wg.Wait()
    fmt.Println("Итоговый счетчик:", counter)
}

Go to playground: https://go.dev/play/p/cLvSy7Mccdj

In this example, we used the “atomic.AddInt64()” function to atomically modify the “counter” variable. This function adds the given value to the counter and returns the new one. We also used the “atomic.LoadInt64()” function to atomically read the current value of the counter (“counter” variable). Note that we do not need to use a mutex to protect the counter, since atomic operations ensure its safe updating.

Channels

Definition

In simple terms, channels are communication tools between goroutines.

Technically, it is a pipeline from which you can read or put data. That is, one goroutine can send data to a channel, and another can read the data put into that channel.

Usage

Creation

The channel is created like this: ch := make(chan int) . In place of “int” there can be any data type, for example: string.

You can also use “make()” with a second argument – the channel size. This way it will be buffered.

Length and capacity

A buffered channel has a length and a capacity. The length of a channel is the number of values ​​in the queue in the channel buffer that have not been read. The capacity is the size of the channel buffer itself. To get the length, we use the function len(Ваш канал)and to obtain capacity – cap(Ваш канал).

Closing

The channel can also be closed. To do this, you need to call the function close(Ваш канал)thereby blocking access to reading from it. If you need to check whether the channel is closed, or whether it is possible to read data from it, write the following expression: val, ok := <- Ваш каналwhere val is a variable into which the value from the channel will be written, if possible, and ok is a boolean variable, where true means that data can be read from the channel, and false means that it is closed/unable to read.

For range

Using for range, you can read data from a closed buffered channel, since the data remains in the buffer even after the channel is closed. To do this, you need to write a for range structure like this: for elem := range Ваш канал where elem is an element from the channel. Here is an example:

package main

import "fmt"

func main() {
        // создаем канал
       c := make(chan int, 5)

        // записываем числа в канал
       c <- 1
       c <- 2
       c <- 101
       c <- 102
       c <- 1000

       // закрываем канал 
       close(c)

       // циклом идем по каналу
       for num := range c {
         fmt.Println(num)
       }
}

Go to playground: https://go.dev/play/p/DjMEfLAsyZk

Deadlock

If you read data from an empty channel, you will get the “Deadlock” error, which appears when reading from a channel infinitely. It appears because when reading data from a channel, the program will try to read it to the last, even if there is none. It turns out that if there is no data and will never be in the channel, the program will try endlessly. This is how Deadlock appears. Here is an example:

package main

func main() {
        // создаем канал
	c := make(chan int)

        // пытаемся получить данные с канала, которые отсутствуют. Deadlock!
	<-c
}

Go to playground: https://go.dev/play/p/1CSAVnWNEOf

Select

Definition

select – it's almost that switchbut without arguments. There is one more feature – it is used only for operations with channels.

Usage

Select when two channels are ready to read data simultaneously

package main
import "fmt"

func main() {

  // создаем каналы
  number := make(chan int)
  message := make(chan string)
  
 // вызываем функции как горутины
 go channelNumber(number)
 go channelMessage(message)

 // select
 select {
   case firstChannel := <- number:
     fmt.Println("Данные канала:", firstChannel)

   case secondChannel := <- message:
     fmt.Println("Данные канала:", secondChannel)
 }

}

// горутина, которая записывает число в канал
func channelNumber(number chan int) {
  // записываем число в канал
  number <- 15
}

// горутина, которая записывает строку в канал
func channelMessage(message chan string) {
  // записываем строку в канал
  message <- "Погружение в параллелизм в Go"
}

Go to playground: https://go.dev/play/p/xlXKMfRVBSh

Here in the select we choose one of the channels. One goroutine writes data to the channel “number”, and the other – to “message”. Since both channels are prepared, the result of the program is random (but in fact, in this case it will be “Dive into parallelism in Go”)

Select when one of the channels is ready first

package main

import (
  "fmt"
  "time"
)

func main() {

   // создаем каналы
  number := make(chan int)
  message := make(chan string)

  // вызываем функции как горутины
  go channelNumber(number)
  go channelMessage(message)

  // select
  select {
    case firstChannel := <-number:
      fmt.Println("Данные канала:", firstChannel)

    case secondChannel := <-message:
      fmt.Println("Данные канала:", secondChannel)
  }

}

// горутина для записи числа в канал
func channelNumber(number chan int) {
  // записываем число в канал
  number <- 15
}

// горутина для записи строки в канал
func channelMessage(message chan string) {
  
  // функция засыпает на 2 секунды
  time.Sleep(2 * time.Second)

  // записываем строку в канал
  message <- "Погружение в параллелизм в Go"
}

Go to playground: https://go.dev/play/p/vSVJT_F_Hn1

Here, too, there are two functions that write data to the “number” and “message” channels, but under one condition. When writing to the “message” channel, the function sleeps for 2 seconds, making “message” not ready for use by the time “number” is already filled.

Select when two channels are delayed by 2 seconds at the same time

package main
import (
  "fmt"
  "time"
)

func main() {

  // создаем каналы
  number := make(chan int)
  message := make(chan string)

  // вызываем функции как горутины
  go channelNumber(number)
  go channelMessage(message)

  // select
  select {
    case firstChannel := <-number:
      fmt.Println("Данные канала:", firstChannel)

    case secondChannel := <-message:
      fmt.Println("Данные канала:", secondChannel)
  }

}

// горутина для записи числа в канал
func channelNumber(number chan int) {

  // функция засыпает на 2 секунды
  time.Sleep(2 * time.Second)

  number <- 15
}

// горутина для записи строки в канал
func channelMessage(message chan string) {

  // функция засыпает на 2 секунды
  time.Sleep(2 * time.Second)

  message <- "Погружение в параллелизм в Go"
}

Go to playground: https://go.dev/play/p/2OQOmTMmkt-

In this case, both functions fall asleep for 2 seconds. What will select do? Answer: nothing. It will wait until at least one channel is ready for reading. So, here, as in the first case, the answer will be random, since after 2 seconds both channels will already be ready

Default

package main

import (
  "fmt"
  "time"
)

func main() {

  // создаем каналы
  number := make(chan int)
  message := make(chan string)

  // вызываем функции как горутины
  go channelNumber(number)
  go channelMessage(message)

  // select
  select {
    case firstChannel := <-number:
      fmt.Println("Данные канала:", firstChannel)

    case secondChannel := <-message:
      fmt.Println("Данные канала:", secondChannel)
	
    // default case 
    default:
      fmt.Println("Подожди!!! Каналы еще не готовы к чтению!")
  }

}

// горутина для записи числа в канал
func channelNumber(number chan int) {
  
  // функция засыпает на 2 секунды
  time.Sleep(2 * time.Second)

  number <- 15
}

// горутина для записи строки в канал
func channelMessage(message chan string) {

  // функция засыпает на 2 секунды
  time.Sleep(2 * time.Second)
 
  message <- "Погружение в параллелизм в Go"
}

Go to playground: https://go.dev/play/p/S7psSqvugQS

Here – instead of pausing the execution of the “main” function by select while waiting for the channels to be ready, we add default (yes, it's the same as in switch-case) and now, if none of the channels are ready to execute select, the console will display the message “Wait!!! The channels are not ready for reading yet!”

Patterns (concurrency patterns)

Fan-out

Fan out is used when multiple goroutines are reading from the same channel. This reading will only finish when the channel is closed.

Example:

package main
 
import (
   "fmt"
   "time"
)
 
// функция по заполнению канала числами
func generate(nums ...int) <-chan int {
   // создаем канал
   out := make(chan int)
   go func() {
       // идем циклом по переданным числам
       for _, n := range nums {
           // записываем каждое число в канал
           out <- n
       }
   }()
   
   // возвращаем канал
   return out
}

func main() {
   fmt.Println("Запускаем Fan Out ")
   // генерируем два канала: один - с числами 1, 2, 3, а второй - с числами 4, 5, 6
   c1 := generate(1, 2, 3)
   c2 := generate(4, 5, 6)
 
   // запускаем первую горутину
   go func() {
       // циклом идем по первому каналу и печатаем каждое число из него
       for num := range c1 {
           fmt.Println(num)
       }
   }()
   // запускаем вторую горутину
   go func() {
       // циклом идем по второму каналу и печатаем каждое число из него
       for num := range c2 {
           fmt.Println(num)
       }
   }()
 
   // ожидаем незавершившиеся горутины
   time.Sleep(time.Second * 2)
}

Go to playground: https://go.dev/play/p/Mq8JluCtkNC

Here we call the “generate()” function twice, in which we define the int channel, start the goroutine and write the numbers passed to the function into it. After calling the goroutine, we return the channel. Having received two channels, we start two goroutines that will receive numbers from channel 1 and channel 2, and then output them to the console. At the end, we “sleep” for 2 seconds to wait for all goroutines to complete.

Fan-in

Fan in is used when a single function reads from multiple channels until they are closed. This is useful for, for example, aggregating the results of parallel tasks.

Let's write Fan-in:

func merge(in ...<-chan int) <-chan int {
   // создаем WaitGroup
   var wg sync.WaitGroup

   // создаем итоговый канал
   out := make(chan int)
 
   // записываем функцию в переменную "output"
   output := func(c <-chan int) {
       // говорим, чтобы в конце анонимной функции одна горутина из списка ожидания исчезла
       defer wg.Done()
       // циклом идем по каналу "c"
       for n := range c {
           // в итоговый канал "out" записываем числа из канала "c"
           out <- n
       }
   }
   // добавляем в список ожидания столько же горутин, сколько каналов "in" было передано
   wg.Add(len(in))
   // циклом идем по всем каналам "in"
   for _, c := range in {
       // вызываем "output" как горутину
       go output(c)
   }
   // ожидаем незавершившиеся горутины
   wg.Wait()
   return out
}

In this function we combine several “in” channels into one “out” channel.

Briefly about pipeline

A pipeline is a set of handlers, each of which takes some input data, does something with it, processes it, and passes it on to the next one.

Conclusion

I spent a long time writing this article, extracting the most important things from each source and, slightly reworking, finishing, I wrote everything here. I hope you read for yourself what you were looking for. And here are all the sources from which I took the information:

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *