We are told that Apache Kafka preserves the order of messages per topic/topic, but how true is that? In this article, we will analyze several real-world scenarios in which blind acceptance of this dogma can lead to unexpected and misleading message sequences.
Base case: one manufacturer
Let’s start with a basic scenario: one producer sends messages to an Apache Kafka topic with one topic sequentially, one after the other.
In this basic situation, according to a famous mantra, we should always expect the right order. But is it? Depends!
In an ideal world, a single producer scenario should always result in the correct order. But our world is not perfect! Various network paths, errors, and delays can cause a message to be delayed or lost.
Let’s imagine the situation below: one producer sends three messages to a topic:
Сообщение 1finds a long network route to Apache Kafka for some reason
Сообщение 2finds the fastest network route to Apache Kafka
Сообщение 3lost on the net
Even in this basic scenario with only one manufacturer, we might get an unexpected series of posts in a thread. The end result in a Kafka topic will only show two persisted events with an unexpected order
From the point of view of Apache Kafka this may be the correct order. A topic is just a log of information, and Apache Kafka will write messages to the log depending on when it “feels” a new event coming. It is based on reception time Kafka, not on the time the message was created (event time).
Confirmations and retries
But not all is lost! If we look in libraries (for example, aiokafka), note that we have ways to ensure that messages are delivered correctly.
First of all, to avoid the message problem
3 in the above scenario, we could define the correct confirmation mechanism. Parameter
acks producer lets us define what message acknowledgment we want to receive from Apache Kafka.
Setting this option to
1 ensures that we receive confirmation from the main broker responsible for the topic (and section). Setting this option
all guarantees that we will only receive an acknowledgment if both the primary and the replicas store the message correctly. This will save us the trouble of only the primary receiving the message and crashing before propagating it to the replicas.
After we have installed
ackwe have to set the possibility repeated sending a message if we do not receive proper confirmation. Unlike other libraries (one of them is kafka-python), aiokafka automatically retries sending the message until the timeout (set by parameter
WITH confirmation and automatic retries we have to solve the problem with the message
3. On first shipment, the manufacturer will not receive
ackso after the interval
retry_backoff_ms he will send a message
Max number of flight_requests
However, if you take a close look at the end result in the Apache Kafka topic, the resulting order wrong: we sent
1,2,3 and got
2,1,3on topic… how to fix this?
The old method (available in kafka-python) was to set maximum number of flight_requests per connection: this is the number of messages that we allow to hang “in the air” at the same time without acknowledgment. The more messages we run at the same time, the greater the risk of getting messages out of order.
When using kafka-python, if we absolutely needed to have a certain order in a topic, we were forced to restrict
1. Suppose we set the minimum
ack parameter as
1 and wait for each individual message (or message burst if the message size is smaller than the burst size) to be acknowledged before sending the next one.
The absolute correctness of the request, confirmation and retries is achieved at a cost bandwidth. The fewer messages we allow to be “in the air” at the same time, the more confirmations we need to receive, the less common messages we can deliver to Kafka in a given period of time.
To overcome the strict serialization of sending one message at a time and waiting for an acknowledgment, we can define idempotent producers. When using an idempotent producer, each message is tagged with a producer ID and a serial number (a sequence stored for each partition). This constructed ID is then sent to the broker along with the message.
The broker keeps track of the serial number for each manufacturer and theme/section. When a new message arrives, the broker checks the compiled ID, and if within one producer the value is equal to the previous number + 1, then the new message is confirmed, otherwise it is rejected. This provides a guarantee of global message ordering, allowing more in-flight requests per connection (maximum 5 for Java client).
Making it harder: with multiple manufacturers
So far, we have presented a basic scenario with only one producer, but the reality of Apache Kafka is that there are often multiple producers. What little things need to be known if we want to be sure of the final result?
Different locations, different delay
Again, the network is not the same, and multiple manufacturers located in very remote locations may have different latency. This means that the order may differ from the order based on the time of the event.
Unfortunately, we cannot fix the delays between different points on Earth, so we will have to accept this scenario.
Bundling, additional variable
To achieve higher throughput, we can group messages. In batching, we send messages “in batches”, minimizing the total number of calls and increasing the ratio of payload to total message size. But at the same time, we can again change the order of events. Messages in Apache Kafka will be stored per batch based on the time the batch was received. This way the message order will be correct for each package, but different packages may have different ordered messages.
Now that there are different delays and batching, it seems like our global ordering assumption is going to fail… So why do we claim to be able to manage events in order?
Savior: event time
We have seen that the original assumption that Kafka preserves message order is not 100% correct. The order of the messages depends on the time the Kafka is received, not the time the event is generated. But what if we really care about the order based on the time of the event?
Well, we can’t solve the problem on the production side, but we can do it on the consumer side. All of the most common tools that work with Apache Kafka have the ability to determine which field to use as the event time, including Kafka StreamsKafka Connect with dedicated transformation single messages (SMT) to extract timestamps and Apache Flink.
Consumers, if properly defined, will be able to shuffle the order of messages coming from a particular Apache Kafka topic. Let’s analyze the Apache Flink example below:
CREATE TABLE CPU_IN (
time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'topic' = 'cpu_load_stats_real',
'value.format' = 'json',
'scan.startup.mode' = 'earliest-offset'
In the Apache Flink table definition above, we can notice:
occurred_at: The field is defined in the original Apache Kafka topic in unix time (data type is
time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3): Converts unix time to Flink timestamp.
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECONDdefines a new
time_ltzfield (calculated from
occurred_at) as the time of the event and defines the late arrival threshold for events with a maximum delay of 10 seconds.
After the above table is defined,
time_ltz the field can be used to correctly order events and define aggregation windows, ensuring that all events within the allowed delay are included in the calculations.
INTERVAL '10' SECOND defines the latency of the data pipeline and is the penalty that we need to include in order to properly receive late arriving events. Note, however, that bandwidth is not affected. We can have as many messages in our pipeline as we want, but we “wait 10 seconds” before calculating any final KPI to make sure we include all the events in a given time period.
An alternative approach, which only works if the events contain full state, is to store for a specific key (
cpu in the example above) the maximum event time reached so far, and only accept changes if the new event time is greater than the set maximum.
The concept of ordering in Kafka can be tricky even if we include only one topic with one section. This post covers a few common situations that can lead to an unexpected order of events. Fortunately, options such as limiting the number of messages sent or using idempotent producers can help achieve ordering that matches expectations. In the case of multiple producers and unpredictable network latency, we are only left with the ability to fix the overall order on the consumer side by properly handling the time of the event, which needs to be specified in the payload.
If you want to get deeper into Apache Kafka, come to the course “Apache Kafka for Developers”.
You will learn common design patterns, make your application more reliable, and gain experience in developing multiple applications that use Kafka.
Start of the stream – July 14.