Kafka Client for Ktor framework


I needed to write a Ktor application using Apache Kafka and Kafka Streams. I did not find an official client or plugin for Ktor, and I did not want to work with vanilla Kafka, so I, like any self-respecting developer, decided to write a bicycle.

Addiction

So, now my client / plugin can be taken to your project by including jitpack in the build file and specifying the git repository:

repositories {
    maven {
        url = uri("https://jitpack.io")
    }
}

dependencies {
    implementation("com.github.IlyaKalashnikov:ktor-kafka-client:-SNAPSHOT")
}

Admin

Connecting a client to a Ktor application is as easy as any other plugin:

install (Kafka) {
    this.kafkaConfig = mapOf<String, Any>(
                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers
    )
    this.topics = listOf(
                    NewTopic(topic, 1, 1)
    )
}

You only need to send the client a list of topics that need to be created, and the configuration you are interested in.

Other functionality Admin available by calling the buildKafkaAdmin() function. The function takes a single parameter of the Map type, where you can declare all the settings you need.

consumer-producer

For creating an object KafkaConsumer the consumer() function responds with Map arguments for the configuration and List for the topics the consumer should subscribe to.

val consumer = consumer<String, String>(
                mapOf(
                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers,
                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
                    ConsumerConfig.GROUP_ID_CONFIG to "amazing-consumer-group",
                    ConsumerConfig.CLIENT_ID_CONFIG to "amazing-consumer-client"
                ),
                listOf("amazing-topic")
            )

Created in the same way KafkaProducer:

val producer = producer<String, String>(
                mapOf(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers,
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
                )
            )

Instances of Consumer’a, Producer’a and Streams can be passed their own serializers-deserializers, an example of creating a custom serdes:

class YourClassSerde : Serializer<YourClass>, Deserializer<YourClass>, Serde<YourClass> {
    private val mapper = ObjectMapper()

    override fun serialize(topic: String?, data: YourClass): ByteArray {
        try {
            return mapper.writeValueAsBytes(data)
        } catch (e: Exception) {
            throw SerializationException("Error serializing JSON message", e)
        }
    }

    override fun deserialize(topic: String?, data: ByteArray?): YourClass? {
        return if (data == null) {
            null
        } else try {
            mapper.readValue(data, YourClass::class.java)
        } catch (e: IOException) {
            throw SerializationException(e)
        }

    }

The process is described in detail Here.

Kafka Streams

In case of KafkaStreams it is not so much the initialization of the object itself that is important, but the configuration of the topology. For those who are not familiar with the principles of KafkaStreams and Topology, you can learn more about this here.

The kafkaStreams function is responsible for creating an instance of streams, which takes an object of the form as input:

class KafkaStreamsConfig (
    val topic: String,
    val topologyBuilder: StreamsBuilder.() -> Unit,
    val streamsConfig: Map<String, Any>,
    val builder: StreamsBuilder
)

Initializing KafkaStreams with the most primitive topology might look like this:

val streams = kafkaStreams (
                KafkaStreamsConfig(
                    topic = topic,
                    topologyBuilder = fun StreamsBuilder.() {
                        val stream = this.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
                        stream.toTable(Materialized.`as`(table))
                    },
                    streamsConfig = mapOf(
                        StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers,
                        StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String().javaClass,
                        StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG to Serdes.String().javaClass,
                        StreamsConfig.APPLICATION_ID_CONFIG to "your-amazing-app",
                        StreamsConfig.CLIENT_ID_CONFIG to "your-amazing-client",
                        StreamsConfig.COMMIT_INTERVAL_MS_CONFIG to 1000,
                    ),
                    builder = StreamsBuilder()
                )
            )

I wanted the user to have the opportunity to send a topology of any complexity to the client as briefly and to the point as possible. As a result, it was not possible to greatly reduce, but in my opinion, it is still more convenient to do this through the client. An example of a finer topology will be considered below.

Usage example

For example, let’s write an elementary application that will accept requests for the “/produce?message=…” and “/count” endpoints and either send a message to Kafka or display the words sent by the user and the number of their repetitions, respectively. In the application, we will use Koin for Dependency Injection, at the same time we will make it a little closer to the combat conditions of use.

In the main module, we will describe the declaration of our “bins” and implement them in the Koin module. Of course, it is better to distribute the code into separate files, but for the purposes of demonstration, we will get by with the default Application.kt.

fun Application.module() {
    install(Kafka) {
        this.kafkaConfig = mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVERS
        )
        this.topics = listOf(NewTopic(TOPIC, 1, 1))
    }

    install(Koin) {
        val producer = producer<String, String>(
            mapOf(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVERS,
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
            )
        )

        val streams = kafkaStreams(
            KafkaStreamsConfig(
                topologyBuilder = topology(),
                streamsConfig = streamsConfig(),
                builder = StreamsBuilder()
            )
        )
        streams.cleanUp()
        streams.start()


        environment.monitor.subscribe(ApplicationStopped) {
            streams.close(Duration.ofSeconds(5))
        }

        val storage : ReadOnlyWindowStore<String, Long> =
            streams.store(
                StoreQueryParameters.fromNameAndType(
                    "windowed-word-count",
                    QueryableStoreTypes.windowStore()
                )
            )

        val kafkaModule = module {
            single { producer }
            single { streams }
            single { storage }
        }

        modules(kafkaModule)
    }
}

The stream configuration is still placed in a separate file for readability:

fun topology(): StreamsBuilder.() -> Unit {
    return fun StreamsBuilder.() {
        val stringSerde = Serdes.String()
        val textLines: KStream<String, String> =
            this.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()))

        val groupedByWord: KGroupedStream<String?, String> =
            textLines
                .flatMapValues { value: String ->
                    value.lowercase(Locale.getDefault()).split("\\W+".toRegex())
                }
                .groupBy(
                    { _, word -> word },
                    Grouped.with(stringSerde, stringSerde)
                )

        groupedByWord.count(
            Materialized.`as`<String, Long, KeyValueStore<Bytes, ByteArray>>("word-count")
                .withValueSerde(Serdes.Long())
        )

        groupedByWord.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .count(
                Materialized.`as`<String, Long, WindowStore<Bytes, ByteArray>>("windowed-word-count")
                    .withValueSerde(Serdes.Long())
            )
    }
}

fun streamsConfig(): Map<String, Any> {
    return mapOf(
        StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVERS,
        StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String().javaClass,
        StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG to Serdes.String().javaClass,
        StreamsConfig.APPLICATION_ID_CONFIG to "amazing-app",
        StreamsConfig.CLIENT_ID_CONFIG to "amazing-client",
        StreamsConfig.COMMIT_INTERVAL_MS_CONFIG to 1000,
    )
}

As can be seen from the example, in order to set up the topology, it is necessary to throw the StreamsBuilder’s extension function to the client. Inside which we can carry out all manipulations on the this object.

Now let’s write the controllers and “inject” KafkaStreams and KafkaConsumer singletons there.

fun Application.clientExample() {

    val producer by inject<KafkaProducer<String, String>>()
    val storage by inject<ReadOnlyWindowStore<String,Long>>()

    routing {
        get("/produce") {
            val message = call.request.queryParameters["message"]
            producer.send(ProducerRecord("amazing-topic", Random(1337).nextInt().toString(), message))
            call.respondText("Send message $message to tour amazing topic")
        }

        get("/count") {
            val records = storage.all()
            val builder = StringBuilder()
            for (element in records) {
                builder.append("${element.key.key()} ${element.value} | ")
            }
            call.respondText { builder.toString() }
        }
    }
}

Full project code is available Here. At the root is a docker-compose file for deploying Kafka and Zookeeper.

I will be glad if the client is useful to someone, or someone decides to write their own implementation. I used this project as a reference: https://github.com/gAmUssA/ktor-kafka. My version is different in that the user can not only get KafkaStreams instances, but also fully configure the necessary topology. In addition, in my opinion, my implementation is more intuitive in terms of configuration.

I would appreciate suggestions and constructive criticism.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *