Why streaming in KSQL and Kafka Streams is not easy

Hello, Habr!

My name is Sasha, I am the lead developer at GlowByte Consulting. My team and I made a good streaming engine for one large bank. Now in production, online processing of bank authorizations, customer visits to the office and a number of smaller processes are spinning, while everything works on KSQL and Kafka Streams. I want to share what kind of rake we stepped in the process.
If details are interesting, I ask under kat.
image

For those in the tank

If you are not familiar with Kafka, I advise you to read the introduction from the official Apache website: kafka.apache.org/introor from the Confluent website: docs.confluent.io/current/kafka/introduction.htmlso that it’s clear what I’m talking about.

If you worked with Kafka as a messaging bus, then you might have a question: how to make a streaming engine on this? My answer is all thanks to the Kafka Streams API and KSQL.

Kafka Streams is a Java API that allows you to transfer data from topic to topic, making various transformations of this data along the way.
KSQL is an add-on for Kafka Streams, which allows you to use the SQL-like language that Kafka Streams applications automatically generate instead of writing Java code.

What have we done

image

GlowByte Solution Architecture

Our Kafka PROD cluster consists of 3 nodes. Each configuration is something like this:

  • 48 CPU cores (96 with HT) at 2.1 GHz;
  • 512 GB RAM
  • 12x HDD 10 TB;
  • Internal network 40 Gb / s.

As input data are used:

  • Flat files falling on a network drive with a frequency of once per minute;
  • Online data from storefronts in Oracle, downloaded through Oracle Golden Gate;
  • Slow data with storefronts and directories needed for processing that are loaded from Hadoop.

All input data is processed in real time and put into Kafka topics, which are then subtracted via SAS RTDM (Real-Time Decision Manager), and also uploaded to HBase’s NOSQL database, where SAS RTDM goes for dot lookups. Further, SAS RTDM already makes a decision on conducting marketing communications through SMS, PUSH notifications, etc.

Sample scenario (not necessarily real)

A customer makes a purchase on a card in a store. It is determined that the store is located in a large shopping center, where there are also partner stores of the bank that issued the card. So the customer receives a message almost immediately after making a purchase with an offer to visit these stores as well.

We implemented almost all the logic in KSQL scripts. They are written, unlike regular SQL is not so simple. But still easier than Java code. In addition, they can be fixed without compilation (it is convenient to change some simple filters), and they can also be read by a person who is completely unfamiliar with Java and with classical programming in general.
We used the Kafka Streams API only in those cases where KSQL was powerless. For example, with complex one-to-many joins.

How to and how not to

Now I will pass to the main essence of the post. Specifically, to the rake that we ran into.

NOT NECESSARY: Hope that everything works out of the box.
NADO: Examine all technical details.

It would seem an obvious point. But when you implement a large and technically complex solution, you’ll stumble upon something anyway. Our main problems were:

  • We used the same default number of partitions everywhere.
    Partitioning in Kafka is necessary to regulate the possibilities of concurrency. We delivered everywhere 20 partitions “with a margin”. As a result, it turned out that a large number of partitions greatly increased the load on the disks. And our drives are ordinary HDDs, albeit of a large volume;
  • It is necessary to strongly “tune” the parameters of Kafka-brokers, Zookeeper’s, etc.
    Several times, when I tried to restart Kafka on a planned basis (to make a couple of minor parameters), Kafka simply did not rise. I had to quickly deal with the problem and most often increase one or another timeout’s in the configs.

Disks and Iron

NOT NECESSARY: To use the same drives for everything, simply because there is enough space
NADO: It is good to choose the configuration of iron (especially disks)

image

The main problem that we encountered is the use of conventional HDDs under the State Store.
Kafka Streams and, accordingly, KSQL under themselves always have the State Store in the form of RocksDB for joins and calculation of aggregates. The State Store has a very large number of read / write operations. Actually, we did not expect how much. One ordinary HDD can pull (in IO) a couple of medium KSQL processes (two or three joins) or one very large one (five joins + aggregation). But we have a lot of processes (dozens). And there are only 12 disks per node, and almost all of them are occupied with Kafka topics and loading them from above is a bad idea.
As a result, we took out almost the entire State Store in RAM. Works fine, but takes up a few hundred GB of memory.

Paradigm shift

NOT NECESSARY: Just transfer SQL prototypes to streaming, expecting everything to work
NADO: Go from the ETL paradigm to the streaming paradigm

Our company is very active in developing regulatory ETL processes for various customers. ETL processes are usually batche calculations performed once a day, once a week, or less often. In 90% + cases under all wrappers, an ETL process is a sequential execution of SQL queries in a particular database.
Since we wrote in KSQL, at the beginning it seemed quite logical to simply adapt all the SQL prototypes made by analysts to the KSQL syntax. It turned out so it doesn’t work. KSQL is just an SQL-like language, but it works in a completely different way.

Below is a picture showing the difference in updating data in the database and in Kafka. Kafka records are not updated. The old one remains and a new one is added.

image

Another example, this time with shuffling records. During the conversion process, any two messages may first be in the same partition, then change the key, and then again be in the same partition. As a result, their order may change. And since in some cases the order is important, we get the problem.

image

Below is an example of adaptation of one script, which in the ETL paradigm was considered quite simple:

SQL script:

  • 95 lines of code;
  • 1 step
  • 1 full join;
  • About 1-2 person-days for development and testing.

KSQL script:

  • 874 lines of code;
  • 12 steps
  • 2 left join + 1 group by;
  • 40+ man-days for development and testing.

Butches instead of streaming

NOT NECESSARY: Do what streams are enough for streaming
NADO: Correctly choose which tasks can be done and which not

Initially, it was planned that all the necessary processes would be implemented on KSQL and Kafka Streams. In the process, it turned out that some things, the efficiency of which is not so critical, and the complexity of the implementation is very high, it is better to put in the batch. Such batches can be run once an hour, several times a day or less, depending on the criticality. But it will be clearly better than counting everything online online.
For ourselves, we decided:

Can be done in streaming:

  • Filtration;
  • Scalar change of attributes;
  • Spot lookup’s;
  • Simple aggregations.

It is better to put in a batch if possible

  • Joins are one to many and many to many;
  • Complex aggregations;
  • Recalculation for old dates.

Recount problem

NOT NECESSARY: Put a slight retention on all incoming events
NADO: In the tasks of online calculations, be prepared for the fact that everything will have to be counted

In life, you can never be sure that you did everything right away. A situation may arise that in the calculations that were done online for half a month, an error crept in (or the requirements changed retroactively). Of course, in all stateless operations this is not important. What happened is gone. But if, for example, aggregates are considered online (for example, a client’s turnover on a card), then this is a problem. If nothing is done, then they will continue to be considered incorrect. We have to recount everything. Therefore, in some cases, we store data in Kafka topics for up to three months. At the same time, recounting in streaming is not a simple thing.

Database

  1. Recalculation of the necessary data store for the required dates is done;
  2. The data in the target table is replaced by the recalculated ones.

Streaming

  1. Disable online data processing;
  2. Cleared state store;
  3. Offsets are reset;
  4. Data reprocessing in progress;
  5. If necessary, data is uploaded to the database.

Instead of a total

Of course, I described far from all the problems that we encountered in the process of implementing the streaming engine. But this is definitely one of the most basic rake that comes to mind first. If some points seemed obvious or commonplace to someone, I apologize. I sincerely hope that it was not very boring, and that at least someone this article will help.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *