About auto.offset.reset in Spring Kafka

Historically, Apache Kafka uses offsets (or offset) for its messages. Depending on the needs of the consumer settings, you can set the auto.offset.reset parameter to three values: earliest, latest, none. By default, if this parameter is not specified, the latest value is used.

In this release I want to focus on the none parameter.

Earliest

This parameter is used if you need to receive messages from the very beginning of the topic partitions. As a rule, this option makes sense if you need to fill the database with data, transfer all values ​​from one place to another, etc.

Latest

Default setting. It is used when we want to receive up-to-date data, i.e. arrived at the last moment of time. Nothing unusual here either.

None

The parameter for which I decided to write this article. This parameter does not set offset rules for new consumers; it throws exceptions in such cases.

I was faced with a task: to write a microservice that transfers data from one Kafka server to another (an experienced reader may notice that there is KafkaConnect, Apache Kafka Mirror Maker, etc., but I did not choose the requirements for implementing this activity through a microservice) .

One of the special requirements may be that at the initial launch it is necessary to set an offset equal to “current date – 24 hours” and from that moment begin to consume messages (a lot of data arrives per day).

As always, an easier and faster solution comes to mind – add an initial load flag and transmit it when the application starts, which is what was done (imagine that we do not use any database, because the business did not allocate money). But what if the application collects the data and crashes an hour later, and all the data has already been taken? There will be duplication in the produced topic. What to do then?

Yes, you can get confused and immediately remove the initial loading flag after launch; anyway, the offset for the group will already exist. But this is a crutch solution that needs to be gotten rid of.

Without thinking twice, I decided to rewrite the code (in fact, I thought of rewriting it when I encountered the above-described situation of consumer failure and data duplication). I've always heard that the auto.offset.reset parameter has 3 parameters, two of which I had experience building applications with, but the third – none – remained a mystery to me.

It seemed like a no-brainer to me to use none. But in the process of writing code, I bombed harder and harder. The thing is that there are no examples with none anywhere, and I searched all of Google (more precisely, the entire first page of the request). The official documentation says:

You can also select “none” if you would rather set the initial offset yourself and you are willing to handle out of range errors manually.

But there are no examples of correct use. And in all other sources it’s about the same, because… “This is a specific parameter and is suitable for rare cases and we will not consider it.”

After a day, I finally came to a solution that I intend to share. I admit, you can write better code than me. I am far from an ideal programmer, and if you can tell me how to structure it better (or even a different solution), I will be extremely grateful.

Code

First, set the parameter in application.yml (application.properties) to none.

spring:
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: lalala-group-id
      auto-offset-reset:  none
      enable-auto-commit: false
      client-id: lalala-client-id

I also turned off enable.auto.commit

package org.example.service;

import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.time.Instant;
import java.util.*;

@Service
@RequiredArgsConstructor
@Log4j2
public class SettingService {
  private final ConsumerFactory<String, String> consumerFactory;
  @Value("${spring.kafka.topic.test}")
  String topic;
  
  @PostConstruct
  public void checkAndResetOffsetsIfNeeded() {
      Properties consumerProps = new Properties();
      // Передаем конфигурацию через ConsumerFactory
      consumerProps.putAll(consumerFactory.getConfigurationProperties());
      try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
          // Создаем временного консьюмера и берем партиции для нужного топика
          List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
          Set<TopicPartition> topicPartitions = new HashSet<>();
  
          for (PartitionInfo partitionInfo : partitionInfos) {
              topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
          }

          // присваеваем консьюмеру партиции
          consumer.assign(topicPartitions);

          // смотрим нет ли уже для группы консьюмера закомиченных оффсетов в партициях
          Map<TopicPartition, OffsetAndMetadata> commitedOffsets = consumer.committed(topicPartitions);
          Instant resetTime = Instant.now().minus(Duration.ofHours(24));
  
          Map<TopicPartition, Long> latestOffsets = consumer.endOffsets(topicPartitions);
  
          for (TopicPartition topicPartition : topicPartitions) {
              // Берем закомиченные оффсеты из мапы
              OffsetAndMetadata commitedOffset = commitedOffsets.get(topicPartition);
              Long latestOffset = latestOffsets.get(topicPartition);

              // В следующем блоке смотрим, есть ли закомиченные оффсеты, если нет, то ставим оффсет, равный resetTime
              if (commitedOffset == null || commitedOffset.offset() == -1L) {
                  Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer
                          .offsetsForTimes(Collections.singletonMap(topicPartition, resetTime.toEpochMilli()));
                  OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition);
  
                  long newOffset = offsetAndTimestamp != null ? offsetAndTimestamp.offset() : latestOffset;
                  log.info("Resetting offset for partition {}", topicPartition.partition());

                  // Переходим на новый оффсет и коммитим, т.к. у меня вырублен автокоммит
                  consumer.seek(topicPartition, newOffset);
                  consumer.commitSync();
                // Если есть закомиченный оффсет уже, то ничего не делаем
              } else {
                  log.info("Offset for partition {} is already commited at {}",
                          topicPartition.partition(), commitedOffset.offset());
              }
          }
  
          log.info("closing consumer setting");
      }
}

KafkaListener is configured as standard:

@KafkaListener(
            groupId = "${spring.kafka.consumer.group-id}",
            topics = "${spring.kafka.topic.test}"
    )
public void listenTopic(ConsumerRecord<String, String> message, Acknowledgment acknowledgment) {
  try {
    // какие-нибудь операции
    acknowledgment.acknowledge();
  } catch (ProducerException e) {
    log.error(e.getMessage());
    acknowledgment.nack(Duration.ofSeconds(3));
  }
}

The above code will create a temporary consumer before launching KafkaListener, through which we will already configure the offsets. The method for working with a temporary consumer is called through the @PostConstruct annotation, which will first perform manipulations with the temporary consumer, close it and then launch our KafkaListener. There are no conflicts, period.

Calling the method through the CommandLineRunner interface will not work as expected because KafkaListener will be launched at the same time as the temporary consumer and an expedition associated with offsets will be forwarded (I don’t remember exactly which one, experiment). I also tried disabling automatic launch, but it requires setting the KafkaListener id parameter, and I did not experiment with this, as it would have resulted in even more code (and I don’t know if it would be possible to easily scale the number of consumers).

I also tried to manipulate offsets through error handling, but apparently I missed something there and I couldn’t implement it (I don’t rule out that this approach is quite possible).

This is how you can work with none (I think I’m the first to give an example of how to work with this parameter). If you already have examples of working with this parameter, I’ll be glad to take a look, leave them in the comments 🙂

All the best to all Khabra residents!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *