What’s new in Apache Spark 3.2.0 – RocksDB state store


This is an important event for all Apache Spark Structured Streaming users. RocksDB is now available as a state store backend powered by vanilla Spark!

Initialization

To start using the RocksDB state store, you must explicitly define the provider class in your Apache Spark configuration: .config("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")… Later, the createAndInit method of the StateStore companion object initializes the provider instance and calls the method init v RocksDBStateStoreProvider, where the provider is:

  • remembers key schemes and state values

  • creates an appropriate RocksDBStateEncoder – currently there are two encoders: PrefixKeyScanStateEncoder and NoPrefixKeyStateEncoder.Why two? Because Apache Spark 3.2.0 brought us another interesting feature called by scanning prefixes (prefix scan), which optimizes scanning of a certain range of keys (I will write more about this in a couple of weeks). This function is optional and only one of the encoders supports it. Whether or not this feature is supported depends on the key storage format. NoPrefixKeyStateEncoder stores the entire key, while PrefixKeyScanStateEncoder splits the key into two parts: the prefix and the rest. This format also stores the length of the key prefix, but the length of the rest does not, since it is dynamically generated on deserialization from the result keyBytes.length - 4 - prefixKeyEncodedLen

As for initialization, that’s all for now. Much more of the fun will happen later, when Apache Spark finally uses the state store in the state commit operation.

CRUD operations (Create, Read, Update, Delete)

RocksDB State Store uses the same read / write API as the HDFS version. First, using the method load(version: Long) the corresponding version of the repository is loaded. The version corresponds to the serial number of the last microbatch, and the function starts with a comparison of the version to download the current version of the application. If they are different, it means that the execution context probably affects reprocessing and there is no local information. Hence the state store will restore the database from a checkpoint using RocksDBFileManager… Since I’ll cover the topic of recovery in the last section, let’s move on to reading / writing directly.

For obvious reasons, Apache Spark uses the RocksDB API to execute them. In the description of the state store you can find such RocksDB classes as:

ReadOptions – Uses all default options for reading data from a RocksDB instance. This may come as a surprise to you, but Apache Spark uses it not only when returning states to the caller, but also when writing them in the put and remove methods. What for? To track the number of keys in a given version. Thus, the put operation increments the counter, and the remove operation decrements it:

def put(key: Array[Byte], value: Array[Byte]): Array[Byte] = {
    val oldValue = writeBatch.getFromBatchAndDB(db, readOptions, key)
    writeBatch.put(key, value)
    if (oldValue == null) {
      numKeysOnWritingVersion += 1
    }
    oldValue
  }

  def remove(key: Array[Byte]): Array[Byte] = {
    val value = writeBatch.getFromBatchAndDB(db, readOptions, key)
    if (value != null) {
      writeBatch.remove(key)
      numKeysOnWritingVersion -= 1
    }
    value
  }
  • WriteOptions – sets the value of the flag sync to true, which means that a write to disk (flush) must be performed from the OS buffer cache before the write operation is considered complete. This slows down the writing process, but ensures that no data is lost in the event of a machine failure. Apache Spark uses it in a commit to the state store.

  • FlushOptions – allows waitForFlush block the flush operation until it is destroyed. Also used in a commit in the state store.

  • WriteBatchWithIndex – configured with enabled overwriteKey to replace any existing keys in the database. It is a writing interface for RocksDB, but because it is searchable, it also participates in data retrieval operations, including those mentioned above in the context of writing.

  • BloomFilter – used as filtering policy (filter policy) to reduce the number of disk read operations when searching for keys.

Compared to the default state store, the implementation with RocksDB is very similar in terms of CRUD operations. Except for one important difference. Recall that the delete operation in the default state store is implemented through checkpoints and delta-file. This is not how it works in RocksDB – the state is simply removed from the database. There is also a difference in the elements for which breakpoints are created, presented below.

Storage

In the following diagram, you can see the files that the RocksDB state store operates on:

It has two “spaces”. The left-most pane contains all the files used by the current RocksDB database instance. The working directory contains the instance data files, SSTables, and logs. The checkpoint space is the directory created by Apache Spark when the version of the current microbatch is committed (see below). It contains all the files for which checkpoints will be generated. Finally, the file manager space is a directory reserved for activity. RocksDBFileManage… The manager uses it mainly during the cleanup phase to store the uncompressed content of the files for which checkpoints are generated.

As far as the checkpoint space is concerned, you will find there RocksDB files (logs, SST, archived log directory), some metadata with the state store schema, and zip files that store other files needed for the restore process, such as RocksDB parameters or mapping between the files for which the checkpoints are created and their local names.

fault tolerance

The commit from the state store API is launched after processing all the input lines for the desired partition in order to materialize the state store to the version of the current microbatch. In RocksDB, this operation includes the following steps:

  • creating empty and temporary checkpoint directories in RocksDB space

  • writing all version updates to the RocksDB database – this step calls the method write from the RocksDB API. But wait a minute, the CRUD part uses the insert and delete functions from the API. What is the difference then? The state store uses the mode Write Batch With Index, in where writes are serialized to a WriteBatch instead of directly affecting the database. Calling a write operation ensures that they are written to the database atomicity.

  • flushing all data from memory – writes first go to a memory structure called memtable… When the memtable is full, RocksDB writes its contents to sstfile‘s stored on disk. This will ensure that the checkpoint creation process will cover the entire dataset.

  • Compression of the state store, if enabled — This process consists of stripping deleted or overwritten key-value bindings to organize the data to improve query efficiency. It also merges small files with recorded data into larger ones.

  • stopping all ongoing background operations to avoid file inconsistencies during commit due to simultaneous actions on the same files,

  • creating a checkpoint (checkpoint) RocksDB is a built-in RocksDB feature to create a snapshot of a running instance in a separate directory. If the checkpoint and the database are using the same filesystem, as in our case with Apache Spark, the checkpoint will only create hard links to the database SSTable files.

  • instance RocksDBFileManager takes all checkpoint files and copies them into checkpoint space. There are a few tricky things about this copy. For starters, there are two types of files: non-archived and archived. The first category concerns archival log and relevant sst-files. The checkpoint creation process only copies them into the checkpoint space. Another interesting fact follows from this. RocksDB can reuse sst files from a previous microbatch. When this happens and these files match those already copied, the checkpoint process does not copy them. Instead, it will only refer to them in the metadata file, which lists all sst files related to a given microbatch. You can find the contents of this file below:

v1
{"sstFiles":[{"localFileName":"000009.sst","dfsSstFileName":"000009-1295c3cd-c504-4c1b-8405-61e15cdd3841.sst","sizeBytes":1080}],"numKeys":2}

In addition to these sst files and logs, the checkpoint creation process takes metadata (MANIFEST, CURRENT, OPTIONS), a log file, and the aforementioned metadata file that Spark operates on and compresses them into a single zip archive. A short diagram of this process can be seen in the diagram below:

When it comes to restoring the state store in a method load, the RocksDB state store delegates this action RocksDBFileManager… During recovery, the manager downloads the zipped file and unpacks it into the local RocksDB working directory. Later he goes all over sstFiles from metadata and returns it from the checkpoint space to the RocksDB directory. It is worth noting that all copied files are stored locally with the correct names, i.e. The UUID name is replaced by the attribute localFileName

After completing this copy operation, Apache Spark will have all the necessary files to restart the previous RocksDB instance. This is done through a reference to the working directory in the initialization call: NativeRocksDB.open (dbOptions, workingDir.toString)

RocksDB is an interesting alternative to the default heap-based state store implementation optimized for handling a large number of states. However, it is a little more difficult to understand it, since it requires an understanding of new technologies. Fortunately, as an end user, you may not need to know all these details, but if you do, I hope this article helps you!


The translation was prepared on the eve of the start of the course Spark Developer

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *