Manual message confirmation in Kafka

This article describes the implementation of the task of manually acknowledging message processing in Kafka by manually sending a message offset. The logic is implemented using Java, Spring and Kafka.

Plan:

  1. Implement a Kafka listener with manual confirmation.

  2. Add an application.properties file with instructions to manually submit the offset.

  3. Description of Kafka's confirmation mechanism and definitions.

1. Implement a Kafka listener with manual confirmation.

Let's create a Kafka listener using the @KafkaListener annotation. Next, add the Acknowledgment variable to further configure the confirmation:

@Service
public class ServiceCallKafkaListener {

    @KafkaListener(id = "listenerId",
            groupId = "groupListenerId",
            topics = "topicName")
    public void listenServiceCall(@Payload String message,
                                  Acknowledgment acknowledgment) {
        //here is your logic for message processing
        boolean logicForMessageProcessingCompleted = true;
        if (logicForMessageProcessingCompleted) {
            //manual commit
            acknowledgment.acknowledge();
        }
    }
}

You need to call the method acknowledge() object Acknowledgment in the body of the listener method. The listener method is: listenServiceCall(). Method acknowledge() sends a signal to Kafka that it confirms the message has been processed.

2. Add the application.properties file and settings

Let's add Kafka settings to the application.properties file so that the mechanism for manually sending offsets starts working.

#for detail description see "Resources" section in this article
#type of acknowledgment mode
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

#property that turns off auto commit
spring.kafka.consumer.enable-auto-commit=false

3. Description of the Kafka confirmation mechanism and definitions.

Confirmation(Acknowledgment) is a signal in Kafka which means that the message has been processed by the listener and the listener is ready to listen to the next message. The listener in this article is the method listenServiceCall. Kafka moves the offset pointer to the next one after sending the acknowledge signal(acknowledge()).

The listener (KafkaListener) will receive the same message with the same offset (offset) until the listener sends an acknowledgment signal.

Offset in Kafka is the identifier of a message in a message queue.

Sources for further reading.

  1. Acknowledgment mode description

  2. Acknowledgment method documentation

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *