How to Build an Event Handling System from Scratch

Hello!

Alexander Shuvalov and Yuliyan Latypov shared with you their experience of creating a system for processing events in a data stream to enrich information and identify anomalies.

If you are not familiar with the terms below, I recommend that you read the following articles.

First, it's worth adding a little context, we are working on a category product UEBA. The data is logs from devices used by company employees.

We received the following task: before writing data to the target table in the flow, enrich events with additional information and identify anomalies in the data.

A little about the flow

We need to process 80 thousand messages per second, all these are logs from different devices (Windows, Linux, Mac) and specialized equipment (network routers).

You may have already encountered various materials on stream processing, which is not surprising, since the topic is popular in the era of Big Data. However, almost every such material appeals to its own criteria/requirements for data, processing speed, etc. Because of this, it is difficult to find a ready-made description of a solution for a specific task that you have been given. This is exactly what we encountered.

In our case, the key factors of flow were:

1) Ensure a correct timeline of events. Otherwise, some enrichment services will give incorrect results;

2) Processing large volume events, which also reduces the consume/produce speed and loads the data transmission channel.

We faced the following tasks:

  1. Event type definition and filtering:

  • 45 event types, determined by message content

  • filtered events are processed

  1. Data enrichment:

  • definition of maturity by user and host. In our case, maturity is the state of a user/host when it has managed to accumulate a certain number of events for each event type

  • enrichment with historical data

  • geolocation determination by IP address

  • network zone definition by IP address

  • defining a group of devices

  1. Definition of anomalies:

  • first activity user

  • inactive user. For us, a user is considered inactive if no events have been received from him for a sufficiently long period of time.

Architecture

Before we move on to the architecture, it's worth explaining why we didn't use ready-made solutions like Kafka Streams, Apache Flink or Spark Streaming:

  • First, a new component in the system would require additional efforts to master, configure and support it. These are additional resources and time, which is not always acceptable for us.

  • Flexibility of implementation options due to the active stage of product development

That's why we decided to develop own decisionswhich will provide us with flexibility in building the architecture and allow us to control everything that happens inside the services.

On the first iteration We developed solutions on a monolithic architecture, as this approach ensured high speed of development and release of the MVP version, which is what we needed.

The process looked like this:

We assumed that we could easily scale the services by adding additional instances to increase processing speed and cope with any data flow.

However, the following difficulties arose:

  • The load on different types of tasks was uneven, and by adding a separate instance we could use more resources than required.

  • Fault tolerance was low: it was difficult to monitor the operation of all processes and restart individual components.

  • It was easy to construct boundaries for the modules needed for operation, and it was known that they would not change over time.

So we decided to rebuild the enricher architecture to microservices before it was too late.

Problems and their solutions

Event Type Definition and Filtering
The initial solution was implemented in Python, it was quite slow due to the large volume of messages. We would have to spin up a large number of instances, which would increase the load on Kafka and require more resources.

We migrated the solution to Clickhouse instead of Kafka and performed the task using materialized views, through simple rules. As a result, we were able to improve the system's performance without increasing the number of instances and excessive load on the infrastructure.

If there is a way to perform calculations in a database, then it should be used.

Timeline
As noted earlier, events that form a valid time series must be received for processing. One of the key aspects of this process is sorting the incoming data.

Initially, to sort the data that came to us for different periods of time, we decided to use priority queueWe developed a prototype and tested it on our data. Although the solution worked, it had its drawbacks. The main drawback was the high RAM usage, which increased with the message waiting time. Accordingly, the longer the wait, the more memory is required and the slower the service works.

In the end, we settled on a fairly simple idea to implement. Since the data is immediately sent to Clickhouse, we can request it in an already sorted form “from the past”, that is, the data that occurred 2 minutes ago, and send it to Kafka.

The conclusion is the same

Message size
Large message sizes became a serious problem for us, as they prevented us from achieving the required data flow processing speed. High network load led to significant delays.

We initially tried to solve the problem using standard Kafka data compression tools, but this did not produce the expected results.

Afterwards we turned to the Event Driven Architecture (EDA) approach for interaction between services, a purely event-oriented approach was not suitable for us due to the high load on IO operations with the database. We came to the use of differential information transfer (diff). This method allows you to transfer only the event payload needed by other services.

We began to process only the part of the event data that is necessary for our analytical tasks. This allowed us to reduce the size of the transmitted message to 200-300 bytes. Although this solution significantly improved the traffic situation, it also created a new problem of data aggregation, which we will discuss below.

It is necessary to find a balance between service performance and message processing delays, assessing the maximum useful volume of data transferred and what can be left for the service to receive from other sources, such as the database.

Scalability and fault tolerance
Our systems are divided into stateless and stateful services. Stateless services are lightweight and easy to scale because they don't store state between requests. This makes them ideal for distributed data processing. Stateful services are much more complex. They store state between requests, which requires dumping the state into long-term memory and efficient synchronization between these instances.

The state synchronization problem is solved by directing the flow to specific instances based on key event fields – the router. This component can separate data processing flows into isolated groups. This approach will ensure more efficient resource management and reduce the risk of state synchronization problems.

To ensure fault tolerance, you can raise two instances, the first of which performs a useful task, and the second polls the first about its state; if the first fails, the second service creates a process and takes the place of the service that has finished working.

Design your service architecture to be scalable and resilient.

Merge diff

Final insert, we have a big event and diff – our enriched data in the process of analysis. We knew that ClickHouse works well with MatView and poorly with JOIN. Many solutions were slow and clogged RAM.

On the left is a normal JOIN, for large table sizes B will clog up the RAM. This problem was solved by a subquery that reduces the selection of data from the table B for JOIN.

Here's how it works for us. There's a Kafka table in ClickHouse. Now we send the entire diff to a Kafka queue and set up a consumer in ClickHouse to process this data. The consumer receives data from the queue, determines the volumes to process, and adds them to a materialized view, which acts as a trigger. In this view, a JOIN occurs using subqueries by event ID, which reduces the selection of a large table; such JOINs work quickly. This approach allows us to avoid previous performance and memory overload issues, giving ClickHouse the ability to optimally distribute the load and use resources more efficiently.

“Also information for reference: we found that Clickhouse reserves 5 MB of memory per field when making a request, which kills RAM with frequent requests.”

Study the documentation, sometimes solutions to your problems are hidden there, which will significantly speed up development

Architecture

Architecture

It is clear that all services are built sequentially: each subsequent one must work after the previous one. Perhaps this is not the most optimal approach, because some services can be launched in parallel. However, this option allowed us to avoid the need for a final synchronization service that would compare the data of all services. This would create an additional load and increase the likelihood of errors.

Advantages of the current approach

Since the proposed option allows us to provide and maintain the data flow we need, we settled on it. At the same time, we provided the ability to scale each section of the scheme. If the data flow increases significantly, we will be able to transfer the process to parallel processing by adding a synchronization service.

Thus, the consistent construction of services allows us to avoid unnecessary complexity and errors, ensuring stable and efficient data processing. This solution is flexible and can easily adapt to changes in the volume and speed of incoming data.


Subscribe to telegram channel Crosstech Solutions Groupto always be the first to know about events!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *