architecture and basic concepts. Part 2

Let's continue our introduction to Apache Flink by taking a deep dive into memory and performance management. After reviewing the basic concepts in Part 1, we'll look at practical aspects that affect the efficiency and stability of systems.

You will learn how to improve your work with Apache Flink. In particular, we will examine the important concept of watermarks, which plays a key role in processing streaming data with timestamps.

Using RocksDB in Apache Flink

RocksDB is an embedded key-value database optimized for fast disks (SSD). It is designed to work with large amounts of data that do not fit in RAM. In Flink, RocksDB offers the following features:

  • Storing states on disk. This allows you to manage large states that do not fit into RAM, minimizing RAM consumption and reducing the risks associated with data loss during crashes.

  • Efficient use of resourcesSince RocksDB stores data on disk, RAM usage is optimized, which reduces the load on the JVM's Garbage Collector and improves overall application performance.

Apache Flink uses RocksDB as one of the state backend options — the component that determines how and where the states of operators are saved. The state can be recovered from failure thanks to Flink mechanisms such as checkpoint and savepoint. RocksDB allows for accurate recovery of the application state, which is critical to ensuring execution accuracy in distributed systems.

To improve RocksDB performance in Flink, you can apply the following optimization strategies:

  • Fine-tuning RocksDB parameters:

    • Max open files – Managing the number of simultaneously open files can help optimize disk and RAM usage.

    • Block size and Block cache. Adjusting the block size and block cache can improve the speed of reading and writing data.

  • Regularly monitoring performance and resource usage will help identify potential bottlenecks and provide information for further optimization.

  • Proper key distribution and parallelization management can reduce the load on individual nodes and increase overall cluster performance.

Example Flink configuration for using RocksDB as a state backend:

import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkRocksDBExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Путь к директории для сохранения данных RocksDB
        String checkpointDataUri = "hdfs://namenode:8020/flink/checkpoints";
        // Инициализация RocksDBStateBackend
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(checkpointDataUri, true);

        // Установка RocksDBStateBackend в качестве state backend для среды выполнения
        env.setStateBackend(rocksDBStateBackend);

        // Дальнейшая конфигурация и запуск приложения
        env.execute("Flink RocksDB Example");
    }
}

Watermark

Watermarks are special timestamps that are generated and inserted into the data stream. They indicate that the system assumes that all events with timestamps up to a certain time have already been received. Watermarks are used to define a time window during which data can be considered late and processed accordingly.

Architecture and operation of watermarks

Generate watermarks. Flink provides various strategies for generating watermarks. The main interface is WatermarkStrategywhich determines how and when watermarks are generated. They are usually generated with an out-of-orderness. This means that the marks can allow some data to be late for a certain period of time. For example, you can set the data to be delayed up to 10 seconds.

Time progress of events. Watermarks are created based on event timestamps, which can be embedded in the events themselves (e.g., the timestamp) or generated based on the time of event arrival.

Watermarks are propagated along with events in the data stream. When a mark reaches a processing operator, it updates its state to indicate that all events up to the time specified in the watermark have been received.

Processing late data. If an event arrives with a timestamp that is earlier than the current watermark, it is considered late. Flink provides mechanisms to handle such data, including ignoring, buffering, or special actions (such as alerts).

Watermarks help manage time windows. They close when the watermark crosses the window boundary, allowing for the processing of accumulated events.

Examples of using watermarks

Let's look at an example illustrating watermarking and simple event handling:

Code
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class SimpleWatermarkExample {

    public static void main(String[] args) throws Exception {
        // Создаем окружение выполнения Flink
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Источник данных - создаем поток данных из двух событий
        DataStream<MyEvent> input = env.fromElements(
            new MyEvent("event1", 1633065600000L),  // Событие 1 с временной меткой
            new MyEvent("event2", 1633065605000L)   // Событие 2 с временной меткой
        );

        // Настраиваем стратегию водяных знаков с допустимой задержкой в 5 секунд
        WatermarkStrategy<MyEvent> watermarkStrategy = WatermarkStrategy
            .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

        // Применяем стратегию водяных знаков к потоку данных
        input.assignTimestampsAndWatermarks(watermarkStrategy)
             .keyBy(MyEvent::getId)  // Группируем события по идентификатору
             .window(TumblingEventTimeWindows.of(Time.seconds(10)))  // Окна времени по 10 секунд
             .process(new MyProcessFunction())  // Обрабатываем события в окне
             .print();  // Выводим результат обработки

        // Запускаем выполнение программы
        env.execute("Simple Watermark Example");
    }

    // Класс события с идентификатором и временной меткой
    public static class MyEvent {
        private String id;
        private long timestamp;

        public MyEvent(String id, long timestamp) {
            this.id = id;
            this.timestamp = timestamp;
        }

        public String getId() {
            return id;
        }

        public long getTimestamp() {
            return timestamp;
        }
    }

    // Простой процессор окон, который обрабатывает события в окне
    public static class MyProcessFunction extends ProcessWindowFunction<MyEvent, String, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<MyEvent> elements, Collector<String> out) {
            for (MyEvent event : elements) {
                // Обрабатываем событие и выводим его идентификатор и временную метку
                out.collect("Processed event: " + event.getId() + " with timestamp: " + event.getTimestamp());
            }
        }
    }
}

Important aspects of using watermarks

  • Proper watermark generation is important for accurate and timely event processing. Incorrect settings may result in data loss or delays in processing.

  • It is important to test watermarking systems and monitor their behavior in real time to ensure that data is processed correctly.

  • The optimal out-of-orderness setting depends on the specific characteristics of your data and business requirements. Too much latency can slow down processing, while too little latency can result in data loss.

Interacting with the JVM. Memory Management in Apache Flink

Apache Flink, running on the Java Virtual Machine (JVM), uses sophisticated mechanisms to optimize memory management. This includes both Flink's memory management and the JVM's garbage collector memory.

Flink Managed Memory

Flink divides its memory into two categories, each dedicated to specific tasks, to improve performance and reduce latency caused by garbage collection operations.

  • Operator-controlled memory allocated for buffers that operators use to process data. It is separate from the standard JVM heap to minimize the impact of the garbage collector on operator performance.

  • Network buffer memory allocated for network buffers used for communication between nodes in a cluster. Optimizing the size and use of network buffers can significantly impact distributed processing performance.

Example of network memory configuration in Flink:

Configuration config = new Configuration();
config.setString("taskmanager.network.memory.min", "64mb");
config.setString("taskmanager.network.memory.max", "1gb");
config.setString("taskmanager.network.memory.fraction", "0.1");  // 10% от общей памяти задачи

Memory managed by the JVM Garbage Collector (GC)

This memory, which is managed by the JVM through garbage collection mechanisms, is used to store all Java objects, including user-defined function objects and temporary data created during program execution.

Garbage collection optimization for Flink:

  • Tuning a garbage collector such as G1 GC can help reduce latency, especially in memory-intensive applications with high performance requirements.

  • Parameters such as the initial (‑Xms) and maximum (‑Xmx) heaps must be carefully tuned to balance resource usage and performance.

Example of JVM setup for Flink:

flink-conf.yaml:
taskmanager.heap.size: 4096m   #Назначить 4GB памяти для TaskManager

jvm.options:
-XX:+UseG1GC   #Использование G1 Garbage Collector
-XX:MaxGCPauseMillis=100   #Цель максимальной паузы GC составляет 100 мс

Using Data Serialization in Apache Flink

Data serialization in Apache Flink is a critical aspect that affects stream processing performance. Optimizing it can significantly reduce overhead and improve overall application performance.

Serialization is the process of converting objects into a stream of bytes for transmission over a network, storage, or distribution between different components of a system. In the context of stream processing in Flink, serialization is necessary for:

  • Data transfer between operatorsIn a distributed system, data is often transferred between different nodes, which requires its serialization and deserialization.

  • State management. State of operators is often serialized for persistence in external storage or to implement fault tolerance mechanisms such as checkpoint and savepoint.

Optimizing serialization to minimize overhead

Flink provides several built-in serializers that are optimized for different data types. For simple data types — int, long, double — Flink uses efficient serializers that minimize CPU usage and the amount of data transferred.

Let's look at an example where a custom serializer is used to optimize the performance of transferring complex objects:

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

public class MyCustomTypeSerializer extends TypeSerializerSingleton<MyCustomType> {

    private static final MyCustomTypeSerializer INSTANCE = new MyCustomTypeSerializer();

    public static MyCustomTypeSerializer getInstance() {
        return INSTANCE;
    }

    @Override
    public boolean isImmutableType() {
        return false;
    }

    @Override
    public MyCustomType createInstance() {
        return new MyCustomType();
    }

    @Override
    public MyCustomType copy(MyCustomType from) {
        return new MyCustomType(from.getField1(), from.getField2());
    }

    @Override
    public void serialize(MyCustomType record, DataOutputView target) throws IOException {
        target.writeInt(record.getField1());
        target.writeString(record.getField2());
    }

    @Override
    public MyCustomType deserialize(DataInputView source) throws IOException {
        return new MyCustomType(source.readInt(), source.readString());
    }

    @Override
    public MyCustomType copy(MyCustomType from, MyCustomType reuse) {
        reuse.setField1(from.getField1());
        reuse.setField2(from.getField2());
        return reuse;
    }

    @Override
    public int getLength() {
        return -1;  // переменная длина
    }
}

Key points to use your own serializer:

  • efficient serialization and deserialization of object fields;

  • Manage an object instance to minimize the creation of new objects;

  • support for object reuse using the method copy.

To enable a custom serializer in a Flink application, you need to register it with the runtime:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomTypeSerializer.class);

Basic GC parameters in the context of Flink

There are several garbage collectors available in the JVM, each with its own characteristics and best uses:

  1. G1 Garbage Collector (G1 GC). Designed for memory-intensive applications with short GC pause requirements, it works by dividing memory into regions and collecting garbage at predictable times, reducing the likelihood of long pauses.

  2. Concurrent Mark‑Sweep (CMS) GC. Its purpose is to minimize pauses in the application by quickly clearing memory. It is suitable for applications that require fast responsiveness, but may suffer from memory fragmentation and require additional configuration to avoid memory exhaustion.

G1 GC is especially good for systems where short garbage collection pauses are critical. Key parameters:

  • ‑XX:+UseG1GC – Includes G1 Garbage Collector.

  • ‑XX:MaxGCPauseMillis=200 — sets the target maximum GC pause duration. A value of 200 means that the system will try not to exceed a pause of 200 milliseconds.

  • ‑XX:InitiatingHeapOccupancyPercent=45 — specifies the percentage of heap filling at which the GC cycle begins. A value of 45 means that the GC will begin when the heap is 45% full.

  • ‑XX:+ParallelRefProcEnabled – enables parallel processing of links, which can speed up garbage collection.

Example JVM configuration for Flink with G1 GC:

#Параметры JVM для оптимизации Garbage Collection в Apache Flink
java -server -XX:+UseG1GC -XX:MaxGCPauseMillis=200 
-XX:InitiatingHeapOccupancyPercent=45 -XX:+ParallelRefProcEnabled -jar flink-app.jar

To minimize delays caused by GC, it is important to fine-tune the operation of the garbage collector.

GC Monitoring and Logging. Using keys -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:gc.log to log GC activity will help you understand how GC affects your application performance.

Adaptive customization. Inclusion -XX:+UseAdaptiveSizePolicy helps the JVM optimize memory sizes at runtime, which can improve GC performance.

Optimizing memory usage

Using off-heap memory allows Flink to manage memory more efficiently, reducing the load on the JVM garbage collector. Off-heap memory is not considered for garbage collection, which reduces GC time and reduces the likelihood of execution delays. Benefits:

  • Data in off-heap memory is not subject to garbage collection cycles, and collection times are significantly reduced.

  • Efficiently manage large amounts of data without being limited by the JVM heap size.

Example Flink configuration to use off-heap memory:

Configuration config = new Configuration();
config.setString("taskmanager.memory.flink.memory.off-heap.size", "10gb"); // Выделяем 10 GB off-heap памяти

Scalability on multithreaded systems

To ensure scalability and high performance of Flink applications, it is especially important to effectively use multithreading and parallel execution in the JVM.

Flink is optimized for parallel data processing using multiple threads. Adjusting the level of parallelization allows you to increase the overall throughput and reduce the time it takes to process data. In addition, the product uses cluster resources to distribute data processing tasks across multiple nodes, which increases the scalability and resilience of the application.

Example of setting up parallelization in Flink:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // Устанавливаем уровень параллелизма на 4

DataStream<String> dataStream = env.readTextFile("path/to/data");
dataStream.flatMap(new MyFlatMapFunction()).setParallelism(10); // Устанавливаем параллелизм для конкретного оператора

Recommendations for optimizing multithreading:

  • Tuning the thread pool size according to the number of available CPU cores can significantly improve performance.

  • Optimizing code to minimize locks and resource contention improves scalability and performance.

Profiling and Performance Optimization in Apache Flink

Profiling and subsequent performance optimization are key aspects of improving the efficiency of applications developed on top of Apache Flink. This is especially important when running on the JVM, where proper resource management and execution optimization can significantly speed up processing and increase throughput.

Profiling a Flink Application

This helps identify performance bottlenecks such as:

  • delays caused by garbage collection;

  • inefficient use of memory;

  • problems with multithreading and content;

  • delays in network data transmission.

There are various tools you can use to profile Flink applications, including:

  • VisualVM. One of the most popular tools for monitoring the execution of Java applications. Provides data on memory consumption, CPU load, threads and garbage collection.

  • JProfiler. A commercial tool for profiling Java applications. Provides detailed data on performance, memory, threads, and more.

  • Java Flight Recorder (JFR). Included in the JDK. Collects detailed data about application execution without significantly impacting performance.

Suppose profiling a Flink application reveals the following issues:

  • Full GCs are performed frequently, which take a significant amount of time and impact performance.

  • Certain operators consume an unreasonably large amount of memory, which leads to frequent heap overflows and activation of Full GC.

  • Some cluster nodes are overloaded while others are underloaded, indicating load balancing issues.

Based on the profiling data, you can select a more suitable garbage collector and configure its parameters:

  • Switch to G1 GC, which provides more predictable pause times thanks to incremental garbage collection. To do this, you can add the following JVM parameters:

    -XX:+UseG1GC
    -XX:MaxGCPauseMillis=200   #Цель максимальной длительности паузы
    -XX:InitiatingHeapOccupancyPercent=30   #Начать GC, когда 30% кучи заполнены
  • Increasing the heap size can help reduce the frequency of garbage collections:

    ‑Xms8g ‑Xmx8g #Установка начального и максимального размера кучи на 8 GB

Memory profiling of operators allows you to find out which of them consume the most memory. Based on this data, you can apply optimizations: change algorithms or data structures to use memory more efficiently.

Increasing the level of parallelism to distribute work more evenly across cluster nodes can significantly improve overall performance:

env.setParallelism(10);  

And using the rebalance() or rescale() methods to evenly distribute data across nodes will help reduce the overload of individual nodes and improve overall performance:

dataStream.rebalance();  // Ребалансировка потока данных для равномерного распределения

Conclusion

Apache Flink is a powerful tool for processing streaming data in real time. Effective memory management and performance tuning are key aspects for stable operation of Flink-based systems. The concept of watermarks plays an important role in processing time-stamped data, allowing for proper latency management and event ordering. Proper tuning of these elements allows for increased efficiency and reliability of systems, ensuring accurate and fast data processing.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *