Writing our own Http Kafka Sink Connector

In this article I will provide the implementation of my kafka http sink connector. It does not claim to be universal, but it may help you figure out how to develop your own connector.

Confluent Http Sink Connector – paid, other options from github did not work for me. You can read about Kafka Connect Here. The article assumes that there is an understanding of why the Kafka Connect Framework is needed and how to use it. The presented code is written in Kotlin.

First, let's set the Schema for our connector:

val HTTP_REQUEST_SCHEMA: Schema = SchemaBuilder.struct()
   .name(HTTP_REQUEST_SCHEMA_NAME)
   .field(FIELD_HTTP_METHOD, Schema.STRING_SCHEMA)
   .field(
       FIELD_HTTP_HEADERS,
       SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build()
   )
   .field(FIELD_HTTP_BODY, Schema.OPTIONAL_STRING_SCHEMA)
   .field(FIELD_HTTP_URL, Schema.STRING_SCHEMA)
   .build()

The connector input must necessarily receive a Struct with Schema = HTTP_REQUEST_SCHEMA, i.e. messages from Kafka using converter and transforms must be cast to a Struct with the HTTP_REQUEST_SCHEMA scheme.

To implement the connector, we will use the standard java.net.http.HttpClient. The configuration class for the connector will look like this:

class HttpSinkConfig(private val props: Map<String, String>) : AbstractConfig(configDef, props) {

   companion object {
       private const val RESPONSE_VALIDATOR_CLASS_NAME = "response.validator"
       private const val CONNECTION_TIMEOUT_MS = "connectionTimeoutMs"

       val configDef = ConfigDef()
           .define(
               RESPONSE_VALIDATOR_CLASS_NAME,
               ConfigDef.Type.STRING,
               HttpSuccessStatusResponseValidator::class.java.name,
               ConfigDef.Importance.HIGH,
               "Class name of validator"
           )
           .define(
               CONNECTION_TIMEOUT_MS,
               ConfigDef.Type.LONG,
               2000,
               ConfigDef.Importance.HIGH,
               "Http connection timeout in ms"
           )
   }

   fun responseValidator(): HttpResponseValidator =       (Class.forName(getString(RESPONSE_VALIDATOR_CLASS_NAME)).getDeclaredConstructor().newInstance() as HttpResponseValidator)
           .apply { init(props) }
  
   fun httpClient(): HttpClient =
       HttpClient.newBuilder()
           .connectTimeout(Duration.ofMillis(connectionTimeoutMs()))
           .build()


   private fun connectionTimeoutMs(): Long = getLong(CONNECTION_TIMEOUT_MS)
}

This implementation creates a default HttpClient with only one setting, but the configuration implementation can be extended by adding ssl properties and other specific properties that need to be added to configure the HttpClient. To do this, by analogy with connectionTimeoutMs, you need to declare them in ConfigDef.

responseValidator will be used to validate responses, i.e. It will be possible to set a validator that determines which responses are considered successful and which are incorrect. Let's define an interface for these purposes:

interface HttpResponseValidator : (HttpResponse<String>) -> Unit {
   @Throws(RetriableException::class)
   override fun invoke(response: HttpResponse<String>)
   fun init(props: Map<String, String>)
}

Here is an implementation that classifies responses relative to the http response code. 3 types of http response code are specified:

  1. successCodes – interpret the response as successful

  2. retryCodes – we interpret the response as temporarily erroneous, in which case a retransmission is performed

  3. errorCodes – we interpret the response as erroneous, further behavior depends on the setting of the “error.tolerance” task: all – processing continues, none – message processing ends

class StatusResponseValidator : HttpResponseValidator {
   private lateinit var retryCodes: List<Int>
   private lateinit var successCodes: List<Int>
   private lateinit var errorCodes: List<Int>


   override fun invoke(response: HttpResponse<String>) {
       if (response.statusCode() !in successCodes) {
           if (response.statusCode() in retryCodes || retryCodes.isEmpty() && response.statusCode() !in errorCodes)
               throw RetriableException("Status $response.statusCode() is not success $successCodes")
           else throw IllegalArgumentException("Status $response.statusCode() is not success $successCodes")
       }
   }

   override fun init(props: Map<String, String>) {
       val config = AbstractConfig(
           ConfigDef()
               .define(
                   RETRY_CODES,
                   ConfigDef.Type.LIST,
                   listOf<String>(),
                   ConfigDef.Importance.LOW,
                   "Http response codes for retry"
               )
               .define(
                   SUCCESS_CODES,
                   ConfigDef.Type.LIST,
                   listOf("200"),
                   ConfigDef.Importance.HIGH,
                   "Success http response codes"
               )
               .define(
                   ERROR_CODES,
                   ConfigDef.Type.LIST,
                   listOf<String>(),
                   ConfigDef.Importance.HIGH,
                   "Error http response codes"
               ),
           props
       )
       retryCodes = config.getList(RETRY_CODES).map { it.toInt() }
       successCodes = config.getList(SUCCESS_CODES).map { it.toInt() }
       errorCodes = config.getList(ERROR_CODES).map { it.toInt() }
   }

   companion object {
       private const val RETRY_CODES = "response.validator.codes.retry"
       private const val SUCCESS_CODES = "response.validator.codes.success"
       private const val ERROR_CODES = "response.validator.codes.error"
   }
}

An example of how to configure the response validator in the task settings:

{
  ....
"response.validator": "ru.typik.kafka.connect.task.StatusResponseValidator",
"response.validator.codes.success": "200",
"response.validator.codes.error": "400",
  ....
}

The simplest implementation of a task with HttpClient will look like this:

class HttpSinkTask : SinkTask() {

   companion object {
       private val log = LoggerFactory.getLogger(this::class.java)
   }

   protected lateinit var config: HttpSinkConfig
   protected lateinit var httpClient: HttpClient
   protected lateinit var responseValidator: HttpResponseValidator

   override fun version(): String = "1.0"
   override fun stop() {}
   override fun flush(currentOffsets: MutableMap<TopicPartition, OffsetAndMetadata>) { }

   override fun start(props: Map<String, String>) {
       log.info("Starting http sink task...")
       config = HttpSinkConfig(props)
       httpClient = config.httpClient()
       responseValidator = config.responseValidator()
   }

   override fun put(records: Collection<SinkRecord>) {
       if (records.isEmpty()) return

       log.debug(
           "Received {} records. First record kafka coordinates:({}-{}-{}).",
           records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset()
       )

       records.forEach { record ->
               record.toHttpRequestSafe()
                   ?.let { request ->
                       request.send()
                           .also { response ->
                               log.trace("Http request: {}, Http response: {}", request, response)
                               response.validate(record)
                           }
                   }
       }
   }

   private fun HttpRequest.send() = try {
       httpClient.send(this, HttpResponse.BodyHandlers.ofString())
   } catch (ex: Exception) {
       // Сообщения будут переобработаны через backoffTimeoutMs
       log.error("Error during sending http request: $this", ex)
       log.info("Context timeout before retry")
       context.timeout(config.backoffTimeoutMs())
       log.info("Throw exception after context timeout")
       throw RetriableException(ex)
   }

   protected fun HttpResponse<String>.validate(record: SinkRecord) {
       try {
           responseValidator(this)
       } catch (ex: RetriableException) {
           // Сообщения будут переобработаны через backoffTimeoutMs
           log.info("Context timeout before retry")
           context.timeout(config.backoffTimeoutMs())
           log.info("Throw exception after timeout")
           throw ex
       } catch (ex: Exception) {
           log.error("Matching response failed", ex)
           // Поведение зависит от настройки таски errors.tolerance:
           // * Обработка завершается, если "errors.tolerance": "none"
           // * Обработка продолжается, а сообщения отправляются в dead letter,
           //    если "errors.tolerance": "all" 
           context.errantRecordReporter().report(record, ex)
       }
   }

   protected fun SinkRecord.toHttpRequestSafe() = try {
       toHttpRequest()
   } catch (ex: Exception) {
       log.error("Invalid record", ex)
       // Поведение зависит от настройки таски errors.tolerance:
           // * Обработка завершается, если "errors.tolerance": "none"
           // * Обработка продолжается, а сообщения отправляются в dead letter,
           //    если "errors.tolerance": "all" 
       context.errantRecordReporter().report(this, ex)
       null
   }

   protected fun SinkRecord.toHttpRequest() =
       (value() as Struct)
           .let { httpStruct ->
               HttpRequest.newBuilder()
                   .uri(URI.create(httpStruct.getHttpUrl()))
                   .method(
                       httpStruct.getHttpMethod(),
                       httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody()
                   )
                   .apply {
                       httpStruct.getHttpHeaders()?.forEach { (k, v) -> header(k, v) }
                   }
                   .build()
           }
}

Implementation of the connector class:

class HttpSinkConnector : SinkConnector() {

   private val log = LoggerFactory.getLogger(HttpSinkConnector::class.java)
   private lateinit var settings: Map<String, String>

   override fun version(): String = "1.0"

   override fun start(props: MutableMap<String, String>) {
       log.info("Starting HttpSyncSinkConnector...")
       settings = props
   }

   override fun taskClass(): Class<out Task> = HabrHttpSinkTask::class.java
   override fun taskConfigs(maxTasks: Int): List<Map<String, String>> = 
      List(maxTasks) { settings }
   override fun stop() {}
   override fun config(): ConfigDef = HttpSinkConfig.configDef

   override fun validate(connectorConfigs: Map<String, String>): Config {
       return super.validate(connectorConfigs)
   }
}

A simple implementation of a connector was given, where messages will be processed strictly one by one. Let's try to optimize this implementation a little by using an asynchronous method HttpClient.sendAsync instead of synchronous HttpClient.send. The idea is to send multiple requests in parallel, processing messages in batches. This approach may be more optimal in some cases if it is provided for by the Http Server implementation.

To be able to configure the maximum number of parallel requests, we present the interface and implementation SinkRecordGrouperwhich divides the list of incoming messages at the method's input SinkTask.put into sublists, which in turn are processed strictly sequentially, while the elements of the sublists are processed in parallel:

interface SinkRecordGrouper : (List<SinkRecord>) -> List<List<SinkRecord>> {
   override fun invoke(records: List<SinkRecord>): List<List<SinkRecord>>
   fun init(props: Map<String, String>)
}

Let us present an implementation that, in addition to actually dividing into sublists by the number of elements, also groups them by key, i.e. elements with the same keys are placed in different sublists so that they are not processed in parallel, but strictly sequentially; in some cases, such logic is necessary.

class KeyGrouper : SinkRecordGrouper {

   private var parallelCount by Delegates.notNull<Long>()

   override fun invoke(records: List<SinkRecord>): List<List<SinkRecord>> {
       val result = mutableListOf<List<SinkRecord>>()
       val batch = mutableListOf(*records.toTypedArray())

       while (batch.isNotEmpty()) {
           val keySet = mutableSetOf<Any>()
           val subResult = mutableListOf<SinkRecord>()
           for (r in batch) {
               if (r.key() !in keySet) {
                   subResult.add(r)
                   keySet.add(r.key())
               }

               if (subResult.size >= parallelCount)
                   break
           }
           batch.removeAll(subResult)
           result.add(subResult)
       }
       return result
   }


   override fun init(props: Map<String, String>) {
       val config = AbstractConfig(
           ConfigDef()
               .define(
                   PARALLEL_COUNT,
                   ConfigDef.Type.LONG,
                   5,
                   ConfigDef.Importance.LOW,
                   "How many requests to send in parallel"
               ),
           props
       )
       parallelCount = config.getLong(PARALLEL_COUNT)
   }

   companion object {
       private const val PARALLEL_COUNT = "grouper.parallelCount"
   }
}

Here is the updated implementation HttpSinkTask using HttpClient.sendAsync And KeyGrouper:

class HttpSinkTask : SinkTask() {

   companion object {
       private val log = LoggerFactory.getLogger(this::class.java)
   }

   private lateinit var config: HttpSinkConfig
   private lateinit var httpClient: HttpClient
   private lateinit var responseValidator: HttpResponseValidator
   private lateinit var grouper: SinkRecordGrouper
  
   override fun version(): String = "1.0"
   override fun stop() {}
   override fun flush(currentOffsets: MutableMap<TopicPartition, OffsetAndMetadata>) {
   }

   override fun start(props: Map<String, String>) {
       log.info("Starting http sink task...")
       config = HttpSinkConfig(props)
       httpClient = config.httpClient()
       responseValidator = config.responseValidator()
       grouper = KeyGrouper().apply { init(props) }
   }


   override fun put(records: Collection<SinkRecord>) {
       if (records.isEmpty()) return


       log.debug(
           "Received {} records. First record kafka coordinates:({}-{}-{}).",
           records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset()
       )
      
       grouper(records.toList()).forEach { subRecords ->
           subRecords.map { record ->
               record.toHttpRequestSafe()
                   ?.let { request ->
                       request.sendAsync()
                           ?.exceptionally { ex ->
                               // Батч будет переобработан через backoffTimeoutMs
                               log.error("Error handling response, http request: $this", ex)
                               log.info("Context timeout before retry")
                               context.timeout(config.backoffTimeoutMs())
                               log.info("Throw exception after context timeout")
                               throw RetriableException(ex)


                           }
                           ?.thenApply { response -> response.validate(record) }
                   }
           }
               .forEach {
                   try {
                       it?.join()
                   } catch (ex: CompletionException) {
                       ex.cause?.let { throw it }
                   }
               }
       }
   }


   private fun HttpRequest.sendAsync() = try {
       httpClient.sendAsync(this, HttpResponse.BodyHandlers.ofString())
   } catch (ex: Exception) {
       // Батч будет переработа после backoffTimeoutMs
       log.error("Error during sending http request: $this", ex)
       log.info("Context timeout before retry")
       context.timeout(config.backoffTimeoutMs())
       log.info("Throw exception after context timeout")
       throw RetriableException(ex)
   }

   protected fun HttpResponse<String>.validate(record: SinkRecord) {
       try {
           responseValidator(this)
       } catch (ex: RetriableException) {
           // Сообщения будут переобработаны через backoffTimeoutMs
           log.info("Context timeout before retry")
           context.timeout(config.backoffTimeoutMs())
           log.info("Throw exception after timeout")
           throw ex
       } catch (ex: Exception) {
           log.error("Matching response failed", ex)
           // Поведение зависит от настройки таски errors.tolerance:
           // * Обработка завершается, если "errors.tolerance": "none"
           // * Обработка продолжается, а сообщения отправляются в dead letter,
           //    если "errors.tolerance": "all" )
           context.errantRecordReporter().report(record, ex)
       }
   }

   protected fun SinkRecord.toHttpRequestSafe() = try {
       toHttpRequest()
   } catch (ex: Exception) {
       log.error("Invalid record", ex)
       // report a record processing error to the context
       // depending on the error handling strategy settings:
       // * processing is stopped ( "errors.tolerance": "none" )
       // * processing is continued and this record is sent to the dead letter ( "errors.tolerance": "all" )
       context.errantRecordReporter().report(this, ex)
       null
   }

   protected fun SinkRecord.toHttpRequest() =
       (value() as Struct)
           .let { httpStruct ->
               HttpRequest.newBuilder()
                   .uri(URI.create(httpStruct.getHttpUrl()))
                   .method(
                       httpStruct.getHttpMethod(),
                       httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody()
                   )
                   .apply {
                       httpStruct.getHttpHeaders()?.forEach { (k, v) -> header(k, v) }
                   }
                   .build()
           }
}

What the task configuration might look like:

{
 "connector.class": "ru.typik.kafka.connect.HttpAsyncSinkConnector",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "ru.typik.kafka.connect.converter.ProtobufConverter",
 "value.converter.protoClassName": "ru.typik.debt.proto.NotificationModel$NotificationData",
 "consumer.override.group.id": "${tpp:consumer-group}",
 "auto.create": "false",
 "tasks.max": "1",
 "topics": "${tpp:topic}",
 "errors.tolerance": "all",
 "errors.log.enable": true,
 "errors.log.include.messages": true,
 "errors.deadletterqueue.topic.name": "${tpp:deadLetter}",
 "errors.deadletterqueue.topic.replication.factor": "${tpp:replication-factor}",
 "errors.deadletterqueue.context.headers.enable": true,
 "transforms": "http",
 "transforms.http.type": "ru.typik.HttpTransform",
 "grouper.parallelCount": "50",
 "backoffTimeoutMs": "${tpp:backoffTimeoutMs}",
 "response.validator": "ru.typik.kafka.connect.task.StatusResponseValidator",
 "response.validator.codes.success": "200",
 "response.validator.codes.error": "400"
}

What an implementation might look like HttpTransform:

class HttpTransform<R : ConnectRecord<R>> : Transformation<R> {

   override fun close() {}

   protected fun Struct.getMethod(): String = “GET”
   protected fun Struct.getUrl(): String = “http://localhost:8080”
   protected fun Struct.getBody(): String? = “{}”
   protected fun Struct.getHeaders(): Map<String, String>? = mapOf(
       "Content-Type" to "application/json",
       "Accept" to "application/json"
   )

   override fun apply(record: R): R =
       record.newRecord(
           record.topic(),
           record.kafkaPartition(),
           record.keySchema(),
           record.key(),
           HTTP_REQUEST_SCHEMA,
           (record.value() as Struct).let { struct ->
               Struct(HTTP_REQUEST_SCHEMA)
                   .put(FIELD_HTTP_METHOD, struct.getMethod())
                   .put(FIELD_HTTP_URL, struct.getUrl())
                   .put(FIELD_HTTP_BODY, struct.getBody())
                   .put(FIELD_HTTP_HEADERS, struct.getHeaders())
           },
           record.timestamp()
       )
}

I tested this connector implementation with different values parallelCount and built graphs in Graphana using standard Kafka Connect metrics. I used a wire mock as an Http Server, which responds with a slight delay.

wiremock configuration:

{
 "mappings": [
   {
     "priority": 1,
     "request": {
       "method": "GET",
       "urlPattern": "/"
     },
     "response": {
       "status": 200,
       "fixedDelayMilliseconds": 200,
       "headers": {
         "Content-Type": "application/json"
       }
     }
   }
 ]
}

The resulting graphs:

Load testing

Load testing

This is a fairly conditional testing scenario, it all depends on the implementation of Http Server, whether there will be any benefit from optimization with parallel requests. Perhaps for some, an option with optimization is suitable, which allows you to group http requests into batches and send them in one request, but in this case, the Http Server must also be ready for such specific processing logic.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *