Manufacturer / Consumer at Kafka and Kotlin

Translation of the article prepared in advance of the start of the course “Backend development at Kotlin”


In this article, we will talk about how to create a simple Spring Boot application with Kafka and Kotlin.

Introduction

Start by visiting https://start.spring.io and add the following dependencies:

Groovy

implementation("org.springframework.boot:spring-boot-starter-data-rest")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.apache.kafka:kafka-streams")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")

In our build example, we will use Gradle. You may well choose Maven.

Create and upload a project. Then import it into IntelliJ IDEA.

Download Apache Kafka

Download the latest version of Apache Kafka from their website and unzip it to a folder. I use the Windows 10 operating system. When you start Kafka, you may encounter some problems like “Too many lines encountered”. This is because Kafka adds a large folder structure to its path name. If this problem is not fixed automatically, you will have to rename the folder structure a little shorter and run the application from Power Shell.

To start Kafka, use the following commands:

Shell

.zookeeper-server-start.bat ....configzookeeper.properties
.kafka-server-start.bat ....configserver.properties

You will see these two commands in the folder / bin / windows.

To start Kafka, you first need to start Zookeeper. Zookeeper Is an Apache product that provides a distributed configuration service.

Launch Spring Boot

The first step is to create a class in your IDE called KafkaDemoApplication.kt. When you create a project from the Spring website, the class will be created automatically.

Add the following code:

Kotlin

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class KafkaDemoApplication 

fun main(args: Array) {
   runApplication(*args)
}

Manufacturer

We can send messages to topics in two ways. We will consider them below.

We will develop a controller class that is needed to send and receive messages. Call this class KafkaController.kt. And add the following method:

Kotlin

var kafkaTemplate:KafkaTemplate? = null;
val topic:String = "test_topic"

@GetMapping("/send")
fun sendMessage(@RequestParam("message") message : String) : ResponseEntity {
    var lf : ListenableFuture> = kafkaTemplate?.send(topic, message)!!
    var sendResult: SendResult = lf.get()
    return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic")
}

To send messages to a topic called test_topic, we use KafkaTemplate. It will return an object ListenableFuturefrom which we can get the result of this action. This approach is the easiest if you just want to send a message to the topic.

Second way

The next way to send a message to the Kafka topic is to use an object KafkaProducer. To do this, we will write the following code:

Kotlin

@GetMapping("/produce")
fun produceMessage(@RequestParam("message") message : String) : ResponseEntity {
    var producerRecord :ProducerRecord = ProducerRecord(topic, message)

    val map = mutableMapOf()
    map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"
    map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    map["bootstrap.servers"] = "localhost:9092"

    var producer = KafkaProducer(map as Map?)
    var future:Future = producer?.send(producerRecord)!!
    return ResponseEntity.ok(" message sent to " + future.get().topic());
}

And here you need to make a little explanation.

We need to initialize the object. KafkaProduce with Map, which will contain the key and value for serialization. In our example, we are talking about a string message, so we only need StringSerializer.

Basically, Serializer is a Kafka interface that converts strings to bytes. There are other serializers in Apache Kafka, such as ByteArraySerializer, ByteSerializer, FloatSerializer and etc.

For map, we specify the key and value with StringSerializer.

Kotlin

map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"

The next value is the bootstrap server information needed to communicate with the Kafka cluster.

Kotlin

map["bootstrap.servers"] = "localhost:9092"

All these attributes are needed if we use KafkaProducer.

Then we need to create ProducerRecord with the name of the topic and the message itself. This is exactly what we will do in the next line:

Kotlin

var producerRecord :ProducerRecord = ProducerRecord(topic, message)

Now we can send our message to the topic using the following code:

Kotlin

var future:Future = producer?.send(producerRecord)!!

This operation will return the future with the name of the topic that is used to send the message.

Consumer

We looked at how to send messages to topics. But we also need to listen to incoming messages. To do this, you need to create a listener that will consume messages.
Let’s create a class MessageConsumer.kt and mark it with Service.

Kotlin

@KafkaListener(topics= ["test_topic"], groupId = "test_id")
fun consume(message:String) :Unit {
    println(" message received from topic : $message");
}

You can use this method to listen to a message using annotation. @KafkaListener and output the message to the console as soon as it appears in the topic. Just make sure that you use the same topic name as for sending the message.
You can look at the source code in my repositories on github.


Learn more about the course “Backend development at Kotlin”


Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *