Testing Messages in Kafka

This article proposes an approach to writing integration tests for Kafka applications that allows you to focus on the interaction specification, making the tests more readable and easier to maintain. The presented approach not only improves the efficiency of testing, but also contributes to a better understanding of the integration processes in the application.

This article builds on three ideas from the corresponding articles: writing tests with clear separation of Arrange-Act-Assert steps, isolation in Kafka tests, and using tools to improve test visibility. I recommend you familiarize yourself with them before diving into this article.

Demonstration scenario

Let's take as an example a Telegram bot that forwards requests to the OpenAI API and returns the result to the user. If a request to OpenAI violates the system's security rules, the client will receive a message about it. Additionally, a message will be sent to Kafka for the behavioral control system so that the manager can contact the user and explain that his request was too spicy even for our robot, and ask him to reconsider his preferences.

The service interaction contracts are described in a simplified form to highlight the main logic of the work. Below is a sequence diagram demonstrating the architecture of the application. I understand that the design may raise questions from the point of view of system architecture, but please treat this with understanding – the main goal here is to demonstrate the approach to writing tests.

Message Capture Object

The main testing tool will be the message capture object – RecordCaptor. It is very similar in its operation to the outgoing request capture object – RequestCaptorwhich you can read about in the article Putting the stages of testing http requests in Spring into order.

Message capturing will be performed via the standard Kafka consumer. The list of topics must be specified explicitly via a configuration parameter.

@KafkaListener(id = "recordCaptor", topics = "#{'${test.record-captor.topics}'.split(',')}", groupId = "test")
public void eventCaptorListener(ConsumerRecord<Object, Object> record,
                                @Headers Map<String, Object> boundedHeaders) {
    RecordSnapshot recordSnapshot = mapper.recordToSnapshot(record, boundedHeaders);
    recordCaptor.capture(recordSnapshot);
}

Object RecordCaptor accumulates information from captured messages. Using this approach requires isolation in Kafka tests. Waiting for offset confirmation before checking test results should be done using the KafkaSupport#waitForPartitionOffsetCommit.

Example test

Below is the test code for the described scenario.

def "User Message Processing with OpenAI"() {
    setup:
    KafkaSupport.waitForPartitionAssignment(applicationContext)                           // 1
    and:                                                                                  // 2
    def openaiRequestCaptor = restExpectation.openai.completions(withBadRequest().contentType(APPLICATION_JSON)
            .body("""{
                "error": {
                "code": "content_policy_violation",
                "message": "Your request was rejected as a result of our safety system."
                }
            }"""))
    def telegramRequestCaptor = restExpectation.telegram.sendMessage(withSuccess('{}', APPLICATION_JSON))
    when:
    mockMvc.perform(post("/telegram/webhook")                                             // 3
            .contentType(APPLICATION_JSON_VALUE)
            .content("""{
                "message": {
                "from": {
                    "id": 10000000
                },
                "chat": {
                    "id": 20000000
                },
                "text": "Hello!"
                }
            }""".toString())
            .accept(APPLICATION_JSON_VALUE))
            .andExpect(status().isOk())
    KafkaSupport.waitForPartitionOffsetCommit(applicationContext)                         // 4
    then:
    openaiRequestCaptor.times == 1                                                        // 5
    JSONAssert.assertEquals("""{
        "content": "Hello!"
    }""", openaiRequestCaptor.bodyString, false)
    and:
    telegramRequestCaptor.times == 1
    JSONAssert.assertEquals("""{
        "chatId": "20000000",
        "text": "Your request was rejected as a result of our safety system."
    }""", telegramRequestCaptor.bodyString, false)
    when:                                                                                 // 6
    def message = recordCaptor.getRecords("topicC", "20000000").last
    then:
    message != null
    JSONAssert.assertEquals("""{
        "webhookMessage": {
        "message": {
            "chat": {
            "id": "20000000"
            },
            "text": "Hello!"
        }
        },
        "error": {
            "code": "content_policy_violation",
            "message": "Your request was rejected as a result of our safety system."
        }
    }""", message.value as String, false)
}

Key steps:

  1. Waiting for partitions to be assigned before starting a test scenario.

  2. Mocking requests to OpenAI and Telegram.

  3. Executing a test scenario.

  4. Waiting for offset confirmation.

  5. Checking requests to OpenAI and Telegram.

  6. Inspecting a message in Kafka.

Usage JSONAssert.assertEquals allows you to ensure consistency in data representation at different levels – in Kafka messages, logs and tests. This simplifies the testing process, providing flexibility of comparison and accuracy of error diagnostics.

The article presents an example with the JSON message format; other formats are not considered, but the described approach does not impose any restrictions on the format.

How to find your message in RecordCaptor

Distribution of messages in RecordCaptor is performed by the topic name and key. In the proposed test, the message key in Kafka is used as the key. When sending, we explicitly specify it:

sendMessage("topicC", chatId, ...);
...
private void sendMessage(String topic, String key, Object payload) {
    Message message = MessageBuilder
            .withPayload(objectMapper.writeValueAsString(payload))
            .setHeader(KafkaHeaders.TOPIC, topic)
            .setHeader(KafkaHeaders.KEY, key)  <-- указываем ключ
            .build();
    kafkaTemplate.send(message).get();
}

Search by message key in topic:

when:                                                                                
def message = recordCaptor.getRecords("topicC", "20000000").last <-- используем 

If this option is not suitable, you need to describe your own indexes by message parameters, on the basis of which you need to build a search. An example can be found in the tests PolicyViolationTestsCustomIndex.groovy.

Connecting RecordCaptor

Code for connection RecordCaptor looks like this:

@TestConfiguration(proxyBeanMethods = false)
public class RecordCaptorConfiguration {
    @Bean
    RecordCaptor recordCaptor() {
        return new RecordCaptor();
    }

    @Bean
    RecordCaptorConsumer recordCaptorConsumer(RecordCaptor recordCaptor) {
        return new RecordCaptorConsumer(recordCaptor, new RecordSnapshotMapper());
    }
}

OffsetSnapshotFrame

Experience has shown that working with applications using Kafka requires tools that make it easier to understand the state of consumers and the status of message consumption. For this task, you can compare topic and consumer group offsets in the operation of waiting for confirmation of the offset and output discrepancies to the log, for example, as shown in the figure:

Code OffsetComparisonFrame available for review.

Conclusion

Testing messages in Kafka using the proposed approach not only simplifies the process of writing tests, but also makes it more structured and understandable. Using tools such as RecordCaptoras well as adherence to the principles of isolation and clear separation of testing stages, allow us to achieve high accuracy and efficiency.

Link to the project repository with demo tests — sandbox/bot.

Thanks for reading this article, and good luck in your quest to write effective and clear tests!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *