Managing offsets in Kafka

Offset in Kafka is a numeric identifier that indicates the position of each message within the topic partition. Offsets represent are serial numbers, starting from scratch, and are unique within each partition, but not between different partitions. That is, a message with offset 5 in partition 1 and a message with offset 5 in partition 2 are different messages.

How offsets work:

Each new message in the partition receives the next offset in order. For example, if the last message in a batch had an offset of 10, the next message will receive an offset of 11. This way you can track which messages have been read.

Once recorded in a specific partition, the offset does not change and is not overwritten. That is, data in Kafka is immutable.

Consumers can start reading from any offset in the partition and continue from that point.

Offsets allow you to implement various message delivery guarantees in Kafka.

Three main types of message delivery guarantees

at-most-once

at-most-once guarantees that the message will be delivered once or not at all. This is the most basic approach and aims to minimize delays, but it also entails risk of data loss. As part of this delivery semantics, Kafka does not require the producer to wait for confirmation from the broker that the message has been received. This improves performance to some extent, but also increases the likelihood of data loss due to network failures or broker failure.

The producer configuration in Kafka to achieve at-most-once semantics involves setting the parameter acks to the value 0. That is, the producer sends a message and immediately moves on to the next one, without waiting for confirmation from the broker. You can say: “sent and forgot.”

At-most-once semantics are recommended for systems where the loss of some messages is not critical.

at-least-once

But at-least-once already guarantees that each message will be delivered at least once. This semantics is suitable for applications where it is important that no messages are lost.

Kafka producers send messages while waiting for confirmation from brokers that the message has been saved. If confirmation is not received, the producer will try to send the message again. This resend may result in duplicate messages if the previous send was actually successful but the acknowledgment was lost due to network latency or failures.

Since Kafka version 0.11, the ability to configure idempotency has been introduced to reduce duplication.

Consumers should regularly record offsets of processed messages. If the consumer does not have time to commit the offset before failure, after a restart it will begin processing from the last committed offset, which may lead to re-processing of some messages.

The main problem with at-least-once semantics is the potential for duplication of messages if some failure occurs.

exactly once

exactly-once guarantees that Each message will be processed exactly once, eliminating the possibility of data loss or duplication. The basis for achieving such semantics is a transactional API exposed to both producers and consumers.

For accurate data processing in case of failures or transaction rollbacks, use idempotent producers. They use unique identifiers and sequence numbers for messages to prevent duplicate log entries even if a message is resent due to network outages or errors. This is achieved by setting the parameter enable.idempotence V true.

Transaction API allows producers to start, commit and roll back transactions, which gives some atomicity of records across several partitions. At the same time, it is important to manage transactions in such a way that either all messages in the batch are visible to consumers, or none. To use the transactional API, you must configure a producer with a unique transactional.id..

On the consumer side, you can use the parameter isolation.levelwhich determines which messages will be read: only those that have already been committed read_committedor everything in a row read_uncommitted.

Producer setup example:

bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=all
enable.idempotence=true
transactional.id=unique_transaction_id

Offset management options

Automatic

Automatic offset management allows data consumers to automatically confirm message offsets at specified time intervals.

Basic settings for automatic offset control include enable.auto.commit And auto.commit.interval.ms. Default, enable.auto.commit installed in true, which activates automatic confirmation of offsets. Parameter auto.commit.interval.ms controls how often offsets will be automatically confirmed, and defaults to 5000 milliseconds.

Example Kafka consumer configuration in Java:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("Received message: (key: " + record.key() + ", value: " + record.value() + ") at offset " + record.offset());
        }
    }
} finally {
    consumer.close();
}

The consumer subscribes to the topic and cyclically polls for new messages, automatically confirming offsets every 5 seconds.

While automatic offset management is convenient, it can result in messages being reprocessed in the event of failures. If the consumer shuts down or restarts between polls, it may reprocess messages it has already received because the latest offsets may not have been acknowledged. To fix this, you can confirm offsets more often or use manual confirmation.

Manual control

Kafka has two main methods for manually confirming offsets: synchronous commitSync() and asynchronous commitAsync(). Synchronous commit blocks the thread until the commit operation completes successfully, which provides reliability but can reduce performance. Asynchronous acknowledgment sends an acknowledgment request and returns immediately, which increases throughput but requires additional processing. error handling logic, since it does not provide for retries in case of failures.

An example implementation of asynchronous confirmation of offsets with a callback for error handling:

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ExampleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            TopicPartition partition = new TopicPartition("your-topic", 0);
            consumer.assign(Collections.singletonList(partition));
            consumer.seek(partition, 0); // начать с определенного оффсета

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed record with key %s and value %s at offset %d%n", record.key(), record.value(), record.offset());
                    consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)), (offsets, exception) -> {
                        if (exception != null) {
                            System.err.println("Commit failed for offsets " + offsets);
                        }
                    });
                }
            }
        }
    }
}

The code configures the Kafka consumer to read from a specific offset and asynchronously acknowledge each message processed. The callback checks the success of each commit operation.


Come to open lessons in May, which will be conducted by OTUS teachers as part of the “Apache Kafka” course:

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *