Kafka as a directory store

What is the article about?

I would like to share how we use kafka to organize an online storage of reference information.

System architecture and requirements

Now I'm working on a fairly high-load project for processing financial transactions. The project consists of many services. Some of which can be conditionally distinguished as transactional online flow – which is responsible for reliably and quickly processing incoming transactions (writing off payments and issuing a check).

The central place in this part of the system is occupied by Kafka – input data is processed, enriched and transferred from topic to topic, and in the end the processed transactions must be saved in the master data storage (postgres), for which some kind of reporting is subsequently issued, reconciliations and other interactions with system users.

There is a requirement for online flow that the processing of transactions should not be affected by the temporary unavailability of Postgres (i.e., some kind of infrastructure update work is acceptable for 2-3 hours, during which the database will be unavailable). At the same time, there is an assumption that Kafka is a highly available system. Those. if the database is unavailable, it is acceptable that system users may not receive up-to-date reports for some time, but financial transactions for writing off money and issuing checks should continue to work if external services of banks and Kafka are available.

Directories

The services require directories to be maintained, which can be periodically updated by the support service. Directories represent data or settings that are required to process transactions.

In this article, we’ll take the small directory terminal_acquirer as an example: each terminal_id corresponds to an acquirer_id. Based on this mapping, one of the services decides which bank to authorize.

In reality, directories are tables of 5-10 fields with a size of several thousand records, which can sometimes be updated and supplemented.

Kafka as a directory storage

For the system maintenance service, it is convenient to maintain these directories in a relational database (in our case in Postgres). It is convenient to make selections, updates, there are standard clients for interaction, storage reliability, ACID.

To cover the requirement described above for the online flow part that services should “not notice” the temporary unavailability of Postgres, we apply the approach described in conluent guides on building streaming applications with the only difference that we will not use Kafka Streams.

In short it will look like this:

  1. Editing directory data in Postgres

  2. Kafka Jdbc Source Connector uploads updates to Kafka topic

  3. Inside the service we place a component that reads all messages from the Kafka topic at startup and listens to it further, forming an in-memory structure that is accessed by other components of the service.

Let's illustrate this:

solution illustration

solution illustration

Kafka Connector

You can read about Kafka Connect Framework and JDBC Sink Connector Here.

Kafka JDBC Source Connector is used to periodically download updates from the database to the Kafka topic.

It is necessary to choose correctly what will be used as a record key – in most cases it must match the unique index in the database and be immutable.

Our services use the standard Kafka JDBC Source Connector with mode = timestamp + incrementing. You can read about the standard Jdbc Source Connector, various modes and settings Here. In this operating mode, you need to add a service timestamp field and a service id field with autoincrement in the table.

Let's illustrate how actions to insert and delete data into a table will be displayed in Kafka Topic:

Postgres

Kafka

offset

key

value

insert row1

0

key(row1)

value(row1)

insert row2

1

key(row2)

value(row2)

update row1

2

key(row1)

value(row1)

In this case, it is important that key(row1) offset 0 == key(row1) offset 2. In this case, value(row1) offset 0 != value(row1) offset 2

With this approach, if you delete a record from the database, it will not be displayed in Kafka Topic in any way, because The standard Jdbc Source Connector essentially does a periodic select. In order to somehow turn off records and then delete them, you can introduce a service field isActive, create a constraint that prohibits deleting data with isActive = true, or not deleting it at all. The Kafka connector can send tombstone in the case of isActive = false, or you can send the value as is and leave this logic to the Consumer component.

Can also be used Debezium jdbc source connectorin which, as I understand it, such tricks with service fields are not needed, because it connects to the transaction log of the Postgres master. But in this case, you need to ensure that the connector is connected to the current Postgres master node, which can also be non-trivial.

Thus, any service that listens to Kafka topic can collect an up-to-date replica of the directory. But we must remember that key(row i) does not change. In this case we can apply to kafka topic retention policy compacted. Because we are only interested in the last record for the key; let kafka delete old data for the same keys in accordance with the retention policy.

Let's return to the example with terminal_acquirer. Scripts for creating a directory in the database with all the necessary preconditions described above:

CREATE TABLE termainal_acquirer (
	id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY,
	terminal_id text NOT NULL,
	acquirer_id text NOT NULL,
	is_active bool NOT NULL DEFAULT true,
	updated_date timestamp NOT NULL DEFAULT timezone('utc'::text, CURRENT_TIMESTAMP),
	CONSTRAINT terminal_acquirer_pkey PRIMARY KEY (id)
);

CREATE UNIQUE INDEX terminal_acquirer_unique_idx ON termainal_acquirer USING btree (terminal_id) WHERE (is_active IS TRUE);

-- Триггер, который предотвращает удаление из таблицы
create trigger trigger_terminal_acquirer_delete before
delete  on  termainal_acquirer for each row execute function fun_terminal_acquirer_delete();

– Триггер для обновления updated_date
create trigger trigger_terminal_acquirer_update before
update on  termainal_acquirer for each row execute function fun_terminal_acquirer_update();

–---Функция для обновления даты 
CREATE OR REPLACE FUNCTION fun_terminal_acquirer_update()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
            begin
              if new.terminal_id <> old.terminal_id then
                raise exception 'terminal_id cannot be changed';
              end if;
              new.updated_date = current_timestamp at time zone 'utc';
              return new;
            end;
            $function$
;
–---Функция для предупреждения удаления
CREATE OR REPLACE FUNCTION fun_terminal_acquirer_delete()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
            begin
              if old.is_active <> false then
                raise exception 'Active settings (is_active = true) can not be deleted. Make it inactive firstly (is_active = false)';
              end if;
              return old;
            end;
      $function$;

Kafka Connect Task configuration, which unloads updates from the database table and adds them to Kafka Topic in the format key = terminal_id and value = json(row):

{
 "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
 "connection.url":"${tpp:reference-data.db.url}",
 "connection.user": "${tpp:reference-data.db.user}",
 "connection.password": "${tpp:reference-data.db.password}",
 "dialect.name" : "PostgreSqlDatabaseDialect",


 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable": "false",
 "table.whitelist": "terminal_acquirer",
 "topic.prefix": "${tpp:kafka.topic.prefix}",


 "mode": "timestamp+incrementing",
 "incrementing.column.name": "id",
 "timestamp.column.name": "updated_date",


 "transforms": "changeCase,copyId,extractId,updatedDateFormat",


 "transforms.changeCase.type" : "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value",
 "transforms.changeCase.from" : "LOWER_UNDERSCORE",
 "transforms.changeCase.to" : "LOWER_CAMEL",


 "transforms.copyId.type": "org.apache.kafka.connect.transforms.ValueToKey",
 "transforms.copyId.fields": "id",


 "transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
 "transforms.extractId.field": "id",


"transforms.updatedDateFormat.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.updatedDateFormat.format": "yyyy-MM-dd HH:mm:ss.SSS",
"transforms.updatedDateFormat.target.type": "string",
"transforms.updatedDateFormat.field": "updatedDate"
}

Let’s illustrate the correspondence between events in the database and Kafka:

Postgres

Kafka

offset

key

value

INSERT INTO terminal_acquirer (terminal_id, acquirer_id) VALUES('1', '1');

0

1

{

“id” : “1”,

“terminalId”:”1”,

“acquirerId”:”1”,

“isActive”:true,

“updateDate”:”2024-01-01 11:11:11.000”

}

INSERT INTO terminal_acquirer (terminal_id, acquirer_id) VALUES('2', '2');

1

2

{

“id” : “2”,

“terminalId”:”2”,

“acquirerId”:”2”,

“isActive”:true,

“updateDate”:”2024-01-01 11:11:12.000”

}

UPDATE terminal_acquirer SET acquirer_id='3' WHERE terminal_id = '1'

2

1

{

“id” : “1”,

“terminalId”:”1”,

“acquirerId”:”3”,

“isActive”:true,

“updateDate”:”2024-01-01 11:11:13.000”

}

Kafka Store Component

The Kafka Streams framework has the concept of Store, where Kafka topics act as persistent storage. Kafka Streams has different implementations for working with topic as storage. There is a Global Store – a storage that contains all the data from all partitions of the topic and a simple State Store, which is partitioned, i.e. an application instance contains data in storage only from partitions assigned to this client instance. In turn, there are various storage implementations depending on the type of data storage, for example, it could be In-Memory storage or storage that uses some kind of embedded DB, for example Rocks DB.

Our services, historically, use a reactive client to process messages. In general, Kafka Streams imposes certain restrictions on the parallelization of message processing (the number of parallel tasks depends on the number of partitions allocated on the topic and cannot be more). Because When processing messages from a Kafka topic, we need to make calls to external services, requests to Cassandra, then the reactive approach of processing messages from a topic in batches allows us to use resources more efficiently.

The In-memory Global Store concept seems very suitable for efficient and convenient work with directories in our services. We have a certain Kafka topic, which stores the download of reference data from Postgres.

Let's implement this concept without Kafka Streams. When the application starts, we must read all the messages that are in the directory topic at the time of start, forming some kind of In-Memory structure on the service side (this can be a ConcurrentHashMap). After this, we can start actually processing messages from Kafka, while we must continue to listen to the directory topic and update our In-Memory structure to keep the directory replica up-to-date on the service side.

I will give an implementation of a basic abstract class with metrics, which reads the entire topic when the application starts and then listens to new messages with the abstract process method, the implementation of which should be responsible for generating the directory:

abstract class KafkaGlobalStore(
    private val kafkaStoreProperties: KafkaProperties,
    private val topicSource: List<String>,
    private val metricSchedulePeriod: Duration = Duration.ofSeconds(5)
) {

    private companion object : Log()

    private val properties = Properties().apply {
        setProperty("bootstrap.servers", kafkaStoreProperties.bootstrapServers)
        setProperty("auto.offset.reset", "none")
        setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
        setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")

        if (kafkaStoreProperties.sslEnabled) {
            setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
            setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaStoreProperties.truststorePath)
            setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaStoreProperties.truststorePassword)
            setProperty(
                SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
                kafkaStoreProperties.sslEndpointIdentificationAlgorithm
            )
        }
    }

    // KafkaConsumer is not safe for multithreaded access
    private val consumer = KafkaConsumer<ByteArray?, ByteArray>(properties)
    private val metricConsumer = KafkaConsumer<ByteArray?, ByteArray>(properties)

    private lateinit var topicPartitions: List<TopicPartition>
    private val positions = ConcurrentHashMap<TopicPartition, Long>()
    private val errors = ConcurrentHashMap<Long, StoreError>()

    protected abstract fun process(record: ConsumerRecord<ByteArray?, ByteArray>)

    @PostConstruct
    fun init() {
        StoreMetrics.startInitTime(topicSource.toString())
        topicPartitions = topicSource.flatMap { topic ->
            consumer.partitionsFor(topic).map { TopicPartition(it.topic(), it.partition()) }
        }
        val highWatermarkBarrier = CountDownLatch(1)

        Thread {
            val consumer = KafkaConsumer<ByteArray?, ByteArray>(properties)
            consumer.assign(topicPartitions)
            consumer.seekToBeginning(topicPartitions)

            val highWatermarks = consumer.endOffsets(topicPartitions)

            while (true) {
                if (highWatermarks.isNotEmpty()) {
                    topicPartitions.forEach { partition ->
                        highWatermarks[partition]
                            ?.also { hwm ->
                                if (hwm <= consumer.position(partition)) {
                                    highWatermarks.remove(partition)
                                }
                            }
                    }
                    if (highWatermarks.isEmpty()) {
                        highWatermarkBarrier.countDown()
                    }
                }

                consumer.poll(Duration.ofMillis(kafkaStoreProperties.consumer.maxPollInterval.toLong()))
                    .also { batch ->
                        batch.partitions()
                            .forEach { topicPartition ->
                                batch.records(topicPartition).forEach { record ->
                                    kotlin.runCatching { process(record) }
                                        .onFailure { ex ->
                                            log.error(
                                                "Error processing record in store: key: ${record.key()} offset: ${record.offset()}",
                                                ex
                                            )
                                            errors[record.offset()] = ex.toError(record)
                                            StoreMetrics.updateMetric(StoreMetrics.Error, Tags.empty(), errors.size.toLong())
                                        }
                                }
                                val offset = consumer.position(topicPartition)
                                positions[topicPartition] = offset
                                StoreMetrics.updateMetric(StoreMetrics.Offset, topicPartition, offset)
                            }
                    }
            }
        }.start()
        Timer("store-metric-thread", true).schedule(0L, metricSchedulePeriod.toMillis()) {
            metricConsumer.endOffsets(topicPartitions).forEach { (topicPartition, value) ->
                StoreMetrics.updateMetric(StoreMetrics.Watermark, topicPartition, value)
            }
        }

        highWatermarkBarrier.await()
        StoreMetrics.finishInitTime(topicSource.toString())
    }

    fun endOffsets(): Map<TopicPartition, Long> = consumer.endOffsets(topicPartitions)
    fun positions(): Map<TopicPartition, Long> = positions.toMap()
    fun errors(): Map<Long, StoreError> = errors.toMap()

    private fun Throwable.toError(record: ConsumerRecord<ByteArray?, ByteArray>) =
        StoreError(record.offset(), record.key(), record.topic(), record.partition(), record.timestamp(), message, stackTraceToString())
}

Continuing our example with terminal_acquirer, we give an example of the implementation of this reference book:

class ReferenceDataStore(
   kafkaProperties: KafkaProperties,
   topicSource: List<String>,
   val objectMapper: ObjectMapper
) : KafkaGlobalStore(kafkaProperties, topicSource) {


   private val terminalAcquirerMap = ConcurrentHashMap<String,String>()
   override fun process(record: ConsumerRecord<ByteArray?, ByteArray>) {
       val referenceData = objectMapper.readReferenceData(record.value())
       if (referenceData.isActive)
           terminalAcquirerMap[referenceData.terminalId] = referenceData.acquirerId
       else
           terminalAcquirerMap.remove(referenceData.terminalId)
   }
  
   fun getAcquirerId(terminalId: String): String? = terminalAcquirerMap[terminalId]
}

With this implementation, our service contains a complete current replica of our directory from the terminal_acquirer database in the form of a ConcurrentHashMap. This implementation allows you to work effectively with the directory, because the access occurs to the updated In-Memory structure.

Resource consumption and loading time

With this approach, you need to carefully evaluate the size of the In-Memory structure and the resources required to run the application, taking into account the peculiarities of working with JVM memory. It is also necessary to estimate the time required at the start of the application in order to subtract, deserialize and add all data from the reference topic to the In-Memory structure.

At the moment, all of our directories are small and their download time and resource consumption do not require serious calibration of service settings. But it is necessary to take into account and test the application in case the amount of this data may increase exponentially.

If we talk about the given example with the terminal_acquirer directory, I conducted a test with the directory filled with 50,000 values. The service was successfully launched with the following parameters:

limits:
   cpu: '2'
   memory: 2000Mi
 requests:
   cpu: '2'
   memory: 1000Mi

The initialization time of the storage at the start (reading all 50,000 values ​​in in memory) is ~ 2510ms, which is not so critical for the spring boot service. On average, in our services the data in directories is now an order of magnitude smaller, but even with this order, the loading time and consumed resources look satisfactory to us.

What to do if large directories appear

Currently, our directories are quite small and in the foreseeable future they are not expected to increase by an order of magnitude. Therefore, the In-Memory Global Store approach is sufficient. But what to do if directories that are many times larger appear?

There are 2 problems in this case:

  1. In-Memory storage is not enough

  2. Proofreading and processing the entire topic takes too much time

In this case, you can also take a look at the approaches that are used in Kafka Streams. For example, use embedded DB structures instead of In-Memory. Kafka Streams uses Rocks DB – a fast key-value embedded DB. Using Rocks DB will reduce RAM consumption, which will solve problem 1). Problem 2) can be solved by connecting a persistent volume, which will allow you not to re-read the entire topic when restarting the service, but to read only new data.

Conclusion

The above approach allows you to use Kafka Topic as a data store in your services, without dragging Kafka Streams Framework into the stack if for some reason you don’t want to do this. In addition, Kafka Streams has a number of limitations, including if you need some kind of customization of storage, then in my opinion it is inconvenient to do this there. This implementation makes it possible to organize storage more flexibly.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *