Multithreading Patterns in Go

Hello! You are reading this article, which means you are interested in Go and want to improve your multithreading skills. Surely you are already familiar with and actively use multithreaded programming primitives (goroutines, channels, mutexes). In this article I will try to talk about how to arrange them into the most popular patterns to support scalability and maintainability of future systems.

Why is multithreading so important? Today's apps need to be fast and responsive, which requires multitasking.

Imagine that your program processes user requests, simultaneously downloads data from the server and performs calculations. And all this should happen without delays or errors. For all this you have gorutinki! However, if they are handled carelessly, errors may arise that will be difficult to catch immediately. This is where multithreading patterns come into play, helping to structure and optimize parallel work.

You can save this article and use it as a cheat sheet. Here is a list of patterns that we will analyze today:

  • Feature/Promise – an asynchronous request to perform a task that does not block the main thread.

  • Generator – a simple and convenient way to create a data stream. It can be used to run a goroutine that generates values ​​and passes them through a channel.

  • Pipeline – a powerful pattern that allows you to break a task into several stages of data processing, organizing a smooth flow through goroutines.

  • Fan-in and Fan-out – fan-out helps to parallelize the execution of one task across several goroutines, and fan-in helps to collect the results of these goroutines into one data stream.

  • Semaphore – a convenient tool for controlling the number of simultaneously executed goroutines, protecting against system overloads.

  • Worker Pool – organizes a set of goroutines for parallel processing of tasks, which is especially useful for optimizing performance.

  • Handling errors in goroutines – a pattern that helps to correctly and safely handle errors that occur in goroutines through channels.

Feature/Promise

Let's start with the sweetest. JS developers are rubbing their hands quite hard these days. Yes guys, this is something like async/await in JavaScript. This pattern allows you to run tasks in the background and receive results as those tasks complete, without blocking the main thread of the program.

Details:

But of course there is more code than in JavaScript:

package main

import (
    "fmt"
    "time"
)

func Promise(task func() int) chan int {
    resultCh := make(chan int, 1) // создаем канал для результата

    go func() {
        result := task()          // выполняем задачу
        resultCh <- result        // отправляем результат в канал
        close(resultCh)           // закрываем канал после выполнения
    }()

    return resultCh
}

func main() {
    // Задача, которая занимает 2 секунды
    longRunningTask := func() int {
        time.Sleep(2 * time.Second)
        return 42
    }

    // Запускаем задачу через Promise
    future := Promise(longRunningTask)

    fmt.Println("Задача запущена, можно делать что-то еще...")

    // Ожидаем результат
    result := <-future
    fmt.Println("Результат:", result)
}

In this example, the task runs in the background and you can do something else until it completes. Then you get the result through the channel, which acts as a feature. Once the task has completed, the result is available and the program continues its execution.

This approach is convenient because it avoids blocking the main thread of execution, especially if the task takes a long time. You simply run a task, do other things, and then when the task is completed, return to its result.

If you need to handle errors in such an asynchronous task, you can expand the example by adding the transmission of both the result and the error. This is done using a structure that will contain both the execution result and the error. This approach allows you not only to obtain the result of the task, but also to handle possible failures.

Look:

package main

import (
    "fmt"
    "errors"
    "time"
)

type Result struct {
    value int
    err   error
}

func Promise(task func() (int, error)) chan Result {
    resultCh := make(chan Result, 1) // создаем канал для результата

    go func() {
        value, err := task()          // выполняем задачу
        resultCh <- Result{value: value, err: err} // отправляем результат и ошибку в канал
        close(resultCh)               // закрываем канал
    }()

    return resultCh
}

func main() {
    // Задача, которая возвращает ошибку
    taskWithError := func() (int, error) {
        time.Sleep(2 * time.Second)
        return 0, errors.New("что-то пошло не так")
    }

    // Запускаем задачу через Promise
    future := Promise(taskWithError)

    fmt.Println("Задача запущена, можно делать что-то еще...")

    // Ожидаем результат
    result := <-future
    if result.err != nil {
        fmt.Println("Ошибка:", result.err)
    } else {
        fmt.Println("Результат:", result.value)
    }
}

Now, if a task fails, we can process it correctly, and not just ignore it or log it. This approach makes the code more reliable and allows for better control over asynchronous operations.

The Future/Promise pattern is well suited for cases when you want to run a task in the background and get its result without blocking the program. This is a great way to handle long operations, such as network requests or complex calculations, while doing other useful things without losing control of the results.

Generator

If we create something in Go to work with multithreading, then most often it will work according to the “producer-consumer” scheme. The producer generates data and sends it over the channel, and the consumer receives and processes it. This allows you to process data in parallel without filling up all RAM, which is especially useful when working with large amounts of information or even endless streams of data. This is exactly why we use the Generator pattern.

The generator creates values ​​and sends them to the consumer through a channel. In this case, sending and receiving data is blocked until both parties are ready. This approach allows you to process data as it arrives, minimizing delays.

Look at the example:

package main

import "fmt"

func main() {
    // Данные, которые будут отправляться в канал
    items := []int{10, 20, 30, 40, 50}

    // Получаем канал с данными из генератора
    dataChannel := generator(items)

    // Потребитель обрабатывает данные из канала
    process(dataChannel)
}

// generator создает канал и запускает горутину для отправки данных
func generator(items []int) chan int {
    ch := make(chan int)

    go func() {
        // Закрываем канал после завершения отправки данных
        defer close(ch)

        // Перебираем элементы и отправляем их в канал
        for _, item := range items {
            ch <- item
        }
    }()

    return ch
}

// process получает данные из канала и выводит их
func process(ch chan int) {
    for item := range ch {
        fmt.Println(item)
    }
}

In the example, the generator sends data to the channel through a separate goroutine, and then the consumer processes it, reading one value at a time. Note that the generator closes the channel upon completion of work, which is important for signaling the end of data transmission to the consumer.

This pattern is convenient because the generator and consumer can work simultaneously without waiting for all calculations to complete. This saves memory because data is processed as it arrives rather than being stored in memory entirely. This is especially useful when working with large or endless streams of data.

Pipeline

This is a cool pattern that may seem complicated and unnecessary at first, but in some cases it is absolutely necessary. Pipeline allows you to divide a complex task into several simple ones. The input to one subtask becomes the output to another, and this process continues until the entire chain is completed. This approach helps break down data processing into stages and effectively manage data flows.

In the first example, see how it works without using goroutines:

package main

import "fmt"

func main() {
    value := 1

    // пример 1: сложение, затем умножение
    fmt.Println(multiply(add(value, 2), 3))

    // пример 2: поменяли местами этапы
    fmt.Println(add(multiply(value, 2), 3))
}

// add — функция сложения
func add(a, b int) int {
    return a + b
}

// multiply — функция умножения
func multiply(a, b int) int {
    return a * b
}

As you noticed, all operations are performed sequentially. One stage begins only when the previous one has finished processing all the data. However, using goroutines, you can process data in parallel, speeding up execution.

Now I'll add goroutines and channels to make our pipeline more efficient. Let's change the add and multiply functions so that they work with channels:

package main

import "fmt"

// add — добавляет 2 к каждому значению из inputCh и возвращает канал с результатами
func add(doneCh chan struct{}, inputCh chan int) chan int {
    resultCh := make(chan int)
    
    go func() {
        defer close(resultCh)

        for value := range inputCh {
            result := value + 2

            select {
            case <-doneCh: // если нужно завершить горутину
                return
            case resultCh <- result: // отправляем результат
            }
        }
    }()
    
    return resultCh
}

// multiply — умножает каждое значение на 3 и возвращает канал с результатами
func multiply(doneCh chan struct{}, inputCh chan int) chan int {
    resultCh := make(chan int)

    go func() {
        defer close(resultCh)

        for value := range inputCh {
            result := value * 3

            select {
            case <-doneCh:
                return
            case resultCh <- result:
            }
        }
    }()
    
    return resultCh
}

// generator — отправляет данные в канал
func generator(doneCh chan struct{}, numbers []int) chan int {
    outputCh := make(chan int)

    go func() {
        defer close(outputCh)

        for _, num := range numbers {
            select {
            case <-doneCh:
                return
            case outputCh <- num:
            }
        }
    }()

    return outputCh
}

func main() {
    // данные, которые будем обрабатывать
    numbers := []int{1, 2, 3, 4, 5}

    // канал для остановки работы горутин
    doneCh := make(chan struct{})
    defer close(doneCh)

    // запускаем генератор, который отправляет числа
    inputCh := generator(doneCh, numbers)

    // этапы конвейера: сначала add, потом multiply
    addCh := add(doneCh, inputCh)
    resultCh := multiply(doneCh, addCh)

    // выводим результаты
    for res := range resultCh {
        fmt.Println(res)
    }
}

Thus, when you pass the numbers 1, 2, 3 to the pipeline, the data goes through several stages of parallel processing. For example, while one number is being multiplied, another can already be processed at the addition stage. This allows you to parallelize work on large data without a significant load on 1 CP.

The Pipeline pattern is perfect for a task where processing can be divided into several stages. As an option, you can organize a conveyor for searching for data, filtering and saving the results in the database.

Fan-In Fan-Out

As they say, let's start from the end. The Fan-Out pattern allows you to distribute calculations across several goroutines for parallel data processing. This way you can increase your throughput.

Suppose you have a function add() that requires a lot of resources to execute. To optimize its performance, you can increase the number of goroutines.

Here's an example:

package main

import (
    "fmt"
    "time"
)

// generator — создает канал с данными
func generator(doneCh chan struct{}, numbers []int) chan int {
    dataStream := make(chan int)

    go func() {
        defer close(dataStream)
        for _, num := range numbers {
            select {
            case <-doneCh:
                return
            case dataStream <- num:
            }
        }
    }()

    return dataStream
}

// add — добавляет 1 к каждому значению
func add(doneCh chan struct{}, inputCh chan int) chan int {
    resultStream := make(chan int)

    go func() {
        defer close(resultStream)
        for num := range inputCh {
            // Имитация более затратной работы
            time.Sleep(time.Second)
            result := num + 1

            select {
            case <-doneCh:
                return
            case resultStream <- result:
            }
        }
    }()

    return resultStream
}

// multiply — умножает каждое значение на 2
func multiply(doneCh chan struct{}, inputCh chan int) chan int {
    resultStream := make(chan int)

    go func() {
        defer close(resultStream)
        for num := range inputCh {
            result := num * 2

            select {
            case <-doneCh:
                return
            case resultStream <- result:
            }
        }
    }()

    return resultStream
}

// fanOut — создает несколько горутин add для параллельной обработки данных
func fanOut(doneCh chan struct{}, inputCh chan int, workers int) []chan int {
    resultChannels := make([]chan int, workers)
    
    for i := 0; i < workers; i++ {
        resultChannels[i] = add(doneCh, inputCh)
    }

    return resultChannels
}

The fanOut() function creates several goroutines to perform add(). Data begins to be processed in parallel in different goroutines. Let's imagine that this function becomes slow due to some calculations, we simply divide it into 10 parts and execute it separately from each other.

What if you need to merge these channels back? There is a Fan-In pattern for this. It combines several channels into one.

Let's collect the results:

// fanIn — объединяет результаты нескольких каналов в один
func fanIn(doneCh chan struct{}, channels ...chan int) chan int {
    finalStream := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        chCopy := ch
        wg.Add(1)
        
        go func() {
            defer wg.Done()
            for value := range chCopy {
                select {
                case <-doneCh:
                    return
                case finalStream <- value:
                }
            }
        }()
    }

    go func() {
        wg.Wait()
        close(finalStream)
    }()

    return finalStream
}

As you may have noticed, the fanIn() function takes multiple channels and runs separate goroutines to extract data from each channel. This data is combined into one final channel, which is returned to the main stream.

Let's try using both functions together:

package main

import (
    "fmt"
    "sync"
)

func main() {
    numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    doneCh := make(chan struct{})
    defer close(doneCh)

    inputCh := generator(doneCh, numbers)

    // создаем 10 горутин add с помощью fanOut
    channels := fanOut(doneCh, inputCh, 10)

    // объединяем результаты из всех каналов
    addResultCh := fanIn(doneCh, channels...)

    // передаем результат в следующий этап multiply
    resultCh := multiply(doneCh, addResultCh)

    // выводим результаты
    for result := range resultCh {
        fmt.Println(result)
    }
}

What's happening:

  1. Fan-Out: Create 10 goroutines using fanOut(), each of which executes the add() function. This increases the processing speed of the add() stage.

  2. Fan-In: using fanIn() we combine the results of all add() goroutines into one thread.

  3. Pipeline: we pass the result to the next processing stage, where the data is multiplied by 2 using the multiply() function

In general, Fan-Out and Fan-In help process large amounts of data in parallel, increasing program throughput. However, the order of the output data may differ from the order of the input data, since different goroutines may complete at different times. This approach is useful for optimizing the execution time of resource-intensive program steps, especially when it is important to process data quickly.

Semaphore

Probably my favorite pattern, I use it often. It is needed when you need to limit the number of goroutines that simultaneously work with a certain resource. This helps avoid overloading a resource, such as a database, and allows you to control the number of tasks running in parallel.

If you have worked with relational databases, you will probably now see a familiar syntax. Using a buffered channel, a semaphore can be implemented. Here's a code example:

package main

import (
    "log"
    "sync"
    "time"
)

// Semaphore — структура для управления количеством параллельных горутин
type Semaphore struct {
    semaCh chan struct{}
}

// NewSemaphore — создает новый семафор с заданной максимальной емкостью
func NewSemaphore(maxReq int) *Semaphore {
    return &Semaphore{
        semaCh: make(chan struct{}, maxReq),
    }
}

// Acquire — резервирует место в семафоре
func (s *Semaphore) Acquire() {
    s.semaCh <- struct{}{}
}

// Release — освобождает место в семафоре
func (s *Semaphore) Release() {
    <-s.semaCh
}

The operating principle of Semaphore is simple:

Now let's create a semaphore with a capacity of 2 that will only allow 2 goroutines to pass through at a time. The remaining goroutines will wait their turn:

package main

import (
    "log"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup

    // создаем семафор, который позволит работать только двум горутинам одновременно
    semaphore := NewSemaphore(2)

    // запускаем 10 горутин
    for i := 0; i < 10; i++ {
        wg.Add(1)

        go func(taskID int) {
            // резервируем место в семафоре перед началом работы
            semaphore.Acquire()

            // когда горутина завершает работу, освобождаем место и уменьшаем счетчик WaitGroup
            defer wg.Done()
            defer semaphore.Release()

            // симулируем работу горутины
            log.Printf("Запущен рабочий %d", taskID)
            time.Sleep(1 * time.Second)
        }(i)
    }

    // ждем завершения всех горутин
    wg.Wait()
}

What's happening in the code:

Result of program execution:

❯ go run main.go
2024/10/22 20:32:44 Запущен рабочий 0
2024/10/22 20:32:44 Запущен рабочий 1
2024/10/22 20:32:45 Запущен рабочий 2
2024/10/22 20:32:45 Запущен рабочий 3
2024/10/22 20:32:46 Запущен рабочий 4
2024/10/22 20:32:46 Запущен рабочий 5
2024/10/22 20:32:47 Запущен рабочий 6
2024/10/22 20:32:47 Запущен рабочий 7
2024/10/22 20:32:48 Запущен рабочий 8
2024/10/22 20:32:48 Запущен рабочий 9

As you already noticed, the semaphore allows only 2 goroutines to pass at a time, and the remaining goroutines begin to execute only after the previous ones have completed. It is suitable for situations where resources, such as a database or API, can only handle a limited number of requests at once.

WorkerPool

Musthave. The most hype pattern. Used frequently. It uses a “pool of workers” to process tasks in parallel from a shared queue.

Main elements:

Worker Pool is also convenient because it abstracts you from the need to manually manage the creation and completion of goroutines for each task in the queue. Workers receive them automatically as they appear.

Let's look at the code:

package main

import (
    "fmt"
    "time"
)

// worker — функция, представляющая нашего рабочего процесса
// Принимает id рабочего, канал задач и канал для отправки результатов
func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Рабочий %d начал выполнение задачи %d\n", id, job)
        time.Sleep(time.Second) // симулируем выполнение задачи
        fmt.Printf("Рабочий %d завершил выполнение задачи %d\n", id, job)
        results <- job * 2 // отправляем результат
    }
}

func main() {
    const numJobs = 5 // количество задач для выполнения
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // создаем пул из 3 рабочих
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // отправляем задачи в канал jobs
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    // закрываем канал задач, чтобы рабочие поняли, что больше задач не будет
    close(jobs)

    // получаем результаты от воркеров
    for r := 1; r <= numJobs; r++ {
        res := <-results
        fmt.Printf("Результат: %d\n", res)
    }
}

How this code works:

Example output:

Рабочий 1 начал выполнение задачи 1
Рабочий 2 начал выполнение задачи 2
Рабочий 3 начал выполнение задачи 3
Рабочий 1 завершил выполнение задачи 1
Рабочий 1 начал выполнение задачи 4
Рабочий 2 завершил выполнение задачи 2
Рабочий 2 начал выполнение задачи 5
Рабочий 3 завершил выполнение задачи 3
Рабочий 1 завершил выполнение задачи 4
Рабочий 2 завершил выполнение задачи 5
Результат: 2
Результат: 4
Результат: 6
Результат: 8
Результат: 10

Worker Pool is great for situations where you need to process a large number of similar tasks, such as incoming API requests, working with files, database queries and other tasks that require parallel processing. The pattern allows you to effectively distribute the load without creating an excessive number of goroutines.

Handling errors in goroutines

When you work with goroutines, each one runs independently of its parent. This creates certain difficulties in error handling: errors in parallel goroutines may go undetected unless they are returned to the main thread of the program. Next, I'll show you how you can handle errors that occur in goroutines and pass them back for further processing.

Example without error handling:

package main

import (
    "errors"
    "log"
    "time"
)

func main() {
    input := []int{1, 2, 3, 4}

    // генератор возвращает канал с данными
    inputCh := generator(input)

    // потребитель, обрабатывающий данные
    go consumer(inputCh)

    // добавим время сна, чтобы ошибки успели вывести на экран
    time.Sleep(time.Second)
}

// generator отправляет данные в канал и закрывает его
func generator(input []int) chan int {
    inputCh := make(chan int)

    go func() {
        defer close(inputCh)

        for _, data := range input {
            inputCh <- data
        }
    }()
    return inputCh
}

// consumer принимает данные и вызывает функцию, которая возвращает ошибку
func consumer(ch chan int) {
    for data := range ch {
        err := callDatabase(data)
        if err != nil {
            log.Println(err) // простой вывод ошибки в лог
        }
    }
}

// callDatabase симулирует вызов к базе данных и всегда возвращает ошибку
func callDatabase(data int) error {
    return errors.New("ошибка запроса к базе данных")
}

This code simply prints errors that occur in goroutines, but does not pass them to the main thread of the program. This is not the most efficient solution, since errors remain unhandled in one place. Let's fix this.

Now we will pass errors from goroutines through a channel using the structure:

package main

import (
    "log"
    "errors"
)

// Result — структура для хранения результата и ошибки
type Result struct {
    data int
    err  error
}

func main() {
    input := []int{1, 2, 3, 4}

    resultCh := make(chan Result)

    // запускаем потребителя, который будет отправлять результаты и ошибки
    go consumer(generator(input), resultCh)

    // читаем результаты
    for res := range resultCh {
        if res.err != nil {
            log.Println("Ошибка:", res.err)
        } else {
            log.Println("Результат:", res.data)
        }
    }
}

// generator отправляет данные в канал
func generator(input []int) chan int {
    inputCh := make(chan int)

    go func() {
        defer close(inputCh)
        for _, data := range input {
            inputCh <- data
        }
    }()

    return inputCh
}

// consumer вызывает функцию, которая может возвращать ошибку
func consumer(inputCh chan int, resultCh chan Result) {
    defer close(resultCh)

    for data := range inputCh {
        resp, err := callDatabase(data)
        resultCh <- Result{data: resp, err: err}
    }
}

// callDatabase возвращает ошибку
func callDatabase(data int) (int, error) {
    return data, errors.New("ошибка запроса к базе данных")
}

Errors are now passed through the resultCh channel to the main goroutine, where they can be worked on. This is a convenient way to handle errors in multi-threaded tasks.

However, there is also errgroup from the package golang.org/x/sync/errgroup which provides synchronization, error propagation, and context cancellation for groups of goroutines. This will be useful when you want to perform several parallel tasks and make sure that they all complete successfully. If one of the goroutines returns an error, errgroup will pass it on to the main goroutine, which can handle it.

Here's the code:

package main

import (
    "context"
    "errors"
    "log"

    "golang.org/x/sync/errgroup"
)

func main() {
    g, _ := errgroup.WithContext(context.Background())
    input := []int{1, 2, 3, 4}

    inputCh := generator(input)

    for data := range inputCh {
        data := data // создание новой переменной для каждого запуска горутины
        g.Go(func() error {
            err := callDatabase(data)
            if err != nil {
                return err
            }
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        log.Println("Ошибка:", err)
    }
}

// generator отправляет данные в канал
func generator(input []int) chan int {
    inputCh := make(chan int)

    go func() {
        defer close(inputCh)
        for _, data := range input {
            inputCh <- data
        }
    }()

    return inputCh
}

// callDatabase возвращает ошибку, если data равно 3
func callDatabase(data int) error {
    if data == 3 {
        return errors.New("ошибка запроса к базе данных")
    }
    return nil
}

If an error occurs in one of the goroutines, it will return to the main thread of the program, where it can be processed. For example, if the value of data is 3, an error will be thrown and the output will be like this:

2024/10/22 21:17:40 ошибка запроса к базе данных

The errgroup package is useful when you need to run multiple parallel tasks and make sure they complete. If an error occurs, you don't need to worry about how many errors have occurred—it's enough to handle the first error that occurs.

Parsed patterns are definitely a powerful tool in your hands that will help you effectively distribute tasks between different threads, improving program performance. They allow you to process large amounts of data, work with parallel tasks and speed up complex calculations. However, their use requires a reasonable approach.

Using multithreading unnecessarily can not only complicate your code, but also make it less readable, slower execution, and lead to errors. Blind use of patterns, where simple synchronous execution can be done, often creates more problems than it solves.

Therefore, it is always important to weigh whether it is worth splitting a task into several threads, or whether it is better to execute it sequentially and synchronously. A good developer not only knows patterns, but also knows how to choose the right tool for a specific task.

Under my last article about functional options there were a number of negative comments, like “Why is this necessary?” and “Look, you can do better.” Surely there will be such commentators under this article too. You are all absolutely right! You are great, you have asserted yourself and I am not going to argue with you 🙂 My goal is to describe these patterns in one place, in as much detail as possible and most importantly in Russian, so that those who want to develop can understand them and use them in the future.

Thank you for your time, I hope the material was useful 🙂

Pa

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *