Kafka, go and parallel queues

Imagine an online store website where users make purchases. Microservices that process user requests send messages to kafka about the next purchase made. In kafka there is one topic with one partition from which the microservice reads messages. This microservice generates a purchase history for each user. Once there were so many simultaneous purchases that the history of many users was not updated for several hours.

Why did this happen and how to avoid it? Let’s figure it out.

System model

Let’s simulate this situation using three small programs in go and kafka in docker.

System model diagram

System model diagram

What is shown in the diagram?

Schematic element name

Explanations

gen

A Go utility that simulates user requests and generates “a lot” of messages. In our case there are only 9. This is enough for the demonstration.

P0

The only kafka partition. P – from the word partition. Zero is present in the designation because later another partition will be added to the circuit and they will need to be distinguished.

A:1, A:2 and others

Messages generated by the utility gen. All messages in the diagram are shown in the P0 partition in the order they were written. The order is important, otherwise your purchase history will be inconsistent. Each message is identified by the contents of a key, a colon, and the contents of a value. The values ​​contain numbers that allow you to track the order of messages.

service

A go server that reads and processes messages. This is a microservice model that generates purchase history.

H0

Message handler. H – from the word handler. The zero in the notation is for the same reason as the zero in P0 – then we’ll add another handler. A handler is a goroutine that writes messages to the database.

A.db, B.db and C.db

A go server that models a DBMS. It doesn’t matter which one. You can think of A.db, B.db and C.db as postgresql shards. The letters in the designations are not random – the H0 handler sends all messages with key A to A.db, messages with key B to B.db, messages with key C to C.db.

The diagram also shows a consumer (reads messages from kafka) and a producer (writes messages to kafka). These are kafka concepts whose implementation varies depending on the programming language and kafka client library. I’m using the library segmentio/kafka-goin which these concepts correspond to structures kafka.Writer And kafka.Reader.

Messages are queued

On unimportant implementation details gen And service We will not dwell here and further in the article – their source code can be found separately. However, it is necessary to explain what A.db, B.db and C.db do in the diagram and how they are implemented.

The implementation of the DBMS model is simple and has two features:

  • at one point in time the model processes one message (sync.Mutex);

  • processing one message takes 1 second (time.Sleep).

var dbLock = sync.Mutex{}

func writeNumber(c *fiber.Ctx) error {
  dbLock.Lock()
  defer dbLock.Unlock()

  time.Sleep(1 * time.Second)

  slog.Info("written",
    "number", c.Params("number"),
  )

  return nil
}

The model is implemented as an http request handler using a web framework fiber. Messages are not placed in persistent storage, but are instead output to the terminal.

If you run all the elements of the scheme, then from the logs it will become clear that all nine messages are actually processed for one second each.

DBMS model logs: messages in one queue

DBMS model logs: messages in one queue

Another important observation: messages are processed sequentially – one by one. Message handler H0 sends a message, waits 1 second for a response from the DBMS, and only then proceeds with the next message.

             9 секунд
-----------------------------------
C:3 C:2 C:1 B:3 B:2 B:1 A:3 A:2 A:1    

So where is this queue? The queue is the partition P0 from which messages are delivered to the message handler H0 to be sent one by one to the corresponding DBMS: A.db, B.db and C.db.

One shared message queue

One shared message queue

One big queue is the very reason why the purchase history in the online store took several hours to update. What this means in practice is that if there are 10,000 messages in the queue, the last message in the queue will wait while the 9,999 messages in front of it are processed. And here you can be indignant: any http server in go processes each request in a separate goroutine, but here we have only one goroutine-handler H0 and a huge message queue. Is working with kafka really associated with such restrictions? No! It’s just that http servers encapsulate the implementation of the concurrent code, but with kafka you have to write the concurrent code yourself. Next I will show how this can be done. But first, let’s do the math.

Message threads

DBMS A.db processes one message per second and B.db processes one message per second. If A.db and B.db start processing one message at a time, then in one second the two of them will process 2 messages. And if you also connect C.db to this process, then 3 messages will be processed per second. Then all 9 messages will be processed in 3 seconds.

3 секунды
-----------
A:3 A:2 A:1
B:3 B:2 B:1
C:3 C:2 C:1

In the A.db DBMS, you need to write messages A:1, A:2, A:3 sequentially. Note that these three messages have a common part – key A – and a requirement on the order of processing. For the convenience of further discussion, I will introduce the concept of message flow. Message flow is a set of messages with a common identifier and an established processing order.

A:3 A:2 A:1 - это поток сообщений с идентификатором A
B:3 B:2 B:1 - это поток сообщений с идентификатором B
C:3 C:2 C:1 - это поток сообщений с идентификатором С

Partition queue P0 contains three message threads, and the message key is used as the thread identifier. Let’s start by logically separating these threads.

Message threads

Message threads

It turns out that parallelization of message processing should be carried out at the level of message flows. Indeed, to speed up the processing of existing threads to 3 messages per second, messages from each thread will need to be placed in a separate queue with its own handler.

In an online store, a message flow is a set of messages about one user’s purchases. The ID of this thread will be the user ID. Since the order of purchases in the history is important for each user individually, messages from each thread can also be processed separately.

Creating queues using kafka partitions

The first mechanism for creating queues, which we will consider, is implemented mainly using kafka, namely by increasing the number of partitions. As was established earlier, to achieve maximum message processing speed, three parallel queues are needed. However, in real life it is not always possible or advisable to create one partition per message flow. Therefore, we will add only one additional partition to our scheme.

Add a partition and a consumer: both consumers in one application

Add a partition and a consumer: both consumers in one application

There are now two parties. In order for further discussions about partitions and consumers to make sense, let’s agree that in the system model under consideration, all consumers are members of one consumer group.

We launch two consumers so that each queue-partition has its own message handler. If you run fewer consumers than partitions, then there will definitely be a consumer that reads messages from more than one partition. Then the partitions from which one consumer reads are no longer separate queues. This will be one common queue, because the consumer will process messages from several partitions in turn – one after another. There will be no talk of parallelism here. This is an important observation that allows us to conclude that the concurrency queue is made up of two components:

  • the message queue itself (in this case it is a partition);

  • message handler (in this case, the handler and the consumer are one whole).

In the figure above, I depicted only one of the options for increasing the number of consumers – several consumers are launched in one application. Another option is to simply run two instances of the application.

Add a partition and a consumer: each consumer in a separate application

Add a partition and a consumer: each consumer in a separate application

You can also use a hybrid solution – launch several instances of the application, in each of which run several consumers. The kafka server does not care how consumers are launched and in what programming language they are implemented.

Splitting a topic into partitions makes it possible to launch instances service on different physical machines. If consumers are launched on different physical machines (respectively, executed on different processors), then we can say that messages from each queue are processed in parallel, and not competitively (on the same processor). This feature can be useful if message processing involves resource-intensive calculations running on the same processor on which the consumer is running.

One more advantage of partitions can be highlighted. Imagine that there is only one partition, and an error occurs when processing one of the messages. Your application makes repeated attempts to process this message. While these attempts fail, other messages in the partition wait their turn. And if there were more than one partition, then an error in processing a message from one partition would not affect the processing of messages from other partitions.

For definiteness, we will further consider the option with two copies service.

Distributing messages into queues

At the moment, we have decided on partitions and consumers. Let’s launch the utility gen.

An additional partition led to a violation of the order of message processing in threads

An additional partition led to a violation of the order of message processing in threads

A problem is immediately discovered: the order of messages in the threads is out of order. When there was only one partition, producer in the utility gen I didn’t think about which partition to put this or that message in. Now there are two partitions, and the producer needs an algorithm for selecting a partition. Library segmentio/kafka-go provides such an algorithm. This is the well-known round robin algorithm, the operating principle of which is as follows. The first message to be written to kafka is A:1. It is written to partition P0. Next, message A:2 is written to P1. There are no more partitions, which means A:3 is written again to P0. And so on in a circle.

To more clearly demonstrate the violation of the order of messages, I deliberately “moved back in time” a little the messages in the P1 partition in the diagram above. In reality, such a shift may occur, for example, due to network delays. Then, in accordance with the scheme, the messages will arrive at the H0 handlers in the following order.

A:1 A:3 B:2 C:1 A:2 C:3 B:1 B:3 C:2

So, the number of partitions has been increased, but the order of message processing has been disrupted. The reason is that the default algorithm for distributing messages into partitions does not take into account our message flows – flows A, B and C. This is normal, because the developers segmentio/kafka-go know nothing about these flows.

We change round robin to another algorithm that takes into account our flows.

We distribute messages among partitions correctly using a suitable algorithm

We distribute messages among partitions correctly using a suitable algorithm

Here, by f(S)=P, I denote an algorithm that assigns each message stream (S, stream) a partition number (P, partition). Library segmentio/kafka-go allows you to implement an interface Balancerto direct message flows to the appropriate partitions. Below is the implementation of this interface, that is, the algorithm f(S)=P is given.

type SPBalancer struct{}

func (b *SPBalancer) Balance(msg kafka.Message, partitions ...int) (partition int) {
  hash := func(s string) int {
    h := fnv.New32a()
    h.Write([]byte(s))
    return int(h.Sum32())
  }

  return partitions[hash(string(msg.Key))%len(partitions)]
}

spBalancer.Balance(kafka.Message{Key: []byte("A")}, 0, 1) // вернет 0
spBalancer.Balance(kafka.Message{Key: []byte("B")}, 0, 1) // вернет 1
spBalancer.Balance(kafka.Message{Key: []byte("C")}, 0, 1) // вернет 0

The algorithm uses the thread identifier, in our case stored in the message key, to determine the partition number for this message. Quite often, the f(S)=P algorithm is based on calculating the hash sum of the thread identifier and then taking the remainder of the integer division by the number of partitions. In this example, there are only two partitions and distributing messages among them is not difficult, but in real systems, developing an algorithm f(S)=P for distributing messages across partitions as evenly as possible, coupled with choosing a thread identifier, is a non-trivial task.

Subtotal. We have two full-fledged message queues based on partitions.

DBMS model logs: messages in two parallel queues

DBMS model logs: messages in two parallel queues

However, threads A and C are still waiting in the same queue.

        6 секунд
-----------------------
C:3 C:2 C:1 A:3 A:2 A:1
            B:3 B:2 B:1

So, it’s time to consider the following mechanism for creating queues.

Creating Queues Using Go Channels

Parallel consumer is a queuing mechanism that is implemented in golang using channels. Actually, the channels will play the role of queues. For the queue to be complete, each channel must be provided with its own message handler. Let me remind you that the message handler is a goroutine. Let’s take a closer look at what a parallel consumer is, sending messages to the A.db and C.db DBMS.

The principle of operation of a parallel consumer

The principle of operation of a parallel consumer

The Consumer retrieves a batch of messages from the partition and distributes the messages across channel queues. Here f(S)=H maps message flows to handler numbers (H, handler) with the same goal of preserving the order of messages. Thus, each channel contains messages from only one thread.

As a result, the system will take the following form.

A parallel consumer has appeared in the system model

A parallel consumer has appeared in the system model

We have brought the system to a configuration in which each message stream waits in its own queue for processing.

DBMS model logs: messages in three parallel queues

DBMS model logs: messages in three parallel queues

This is the maximum number of parallel queues that can be created to process messages in an orderly manner. But is the maximum number of queues always appropriate?

We stop on time

It is important to understand what message flows are in the system and estimate their number. For example, if there is only one thread, then there is simply nothing to parallelize – one queue is enough.

Another reason to limit the number of queues is the high load on the system that message handlers access. In our case, the handlers access a system of three DBMSs A.db, B.db and C.db. When there are three DBMSs, then all messages are processed in 3 seconds, because we have three threads, three queues and three DBMSs. But let’s imagine that there is only one DBMS, and it processes messages from all threads.

One instance of the DBMS model cannot process messages from three threads simultaneously

One instance of the DBMS model cannot process messages from three threads simultaneously

Recall that the DBMS model we are using only processes one message at a time thanks to sync.Mutex. That is, competitive requests will simply queue up not in kafka partitions or channels service, but in a single DBMS. As a result, 9 messages will again be processed one by one within 9 seconds. I want to say that it is very easy to launch a parallel consumer with 10,000,000 message handlers, but you need to understand whether your DBMS can cope with such a number of concurrent requests.

Conclusion

In conclusion, a small plan to solve the problem with long updating of purchase history in an online store from the beginning of the article.

You can start by introducing a parallel consumer. This step will not require increasing the number of partitions, because in some cases it is simply not possible to increase the number of partitions. And if there is such an opportunity, then you can add partitions and launch several instances of a microservice that reads kafka messages. Each message flow in such a system will correspond to the purchase sequence of a specific user, and it is worth choosing the user ID as the identifier of this flow. It is convenient to regulate the number of queues using a parallel consumer – changing the number of go channels is easier than adding or removing a partition. It will be convenient if the number of channels is specified in the microservice configuration.

List of sources

A description of the basic concepts of kafka can be found in the following links:

By this link the source code of the java library is located Confluent Parallel Consumer (I borrowed the term parallel consumer from here), which is intended for creating message queues in kafka consumer. The documentation describes techniques for parallel processing of kafka messages and provides many scenarios for using this approach.

Links to go libraries mentioned in the article:

The source code of the applications discussed in the article is located Here.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *