CDC and logical replication for databases implemented on an open source stack

In this article, we will talk about the implementation of the Change Data Capture pattern and inter-cluster data replication in the Platform V DataGrid product, a distributed in-memory database for high-performance computing. And also about the features of the implementation of the function and replication options. Our colleague Nikolai Izhikov from the team for the development of databases on the open source stack helped to write the material.

What is Change Data Capture

Imagine that you have a database with business-critical data and strict read/write SLAs.

At the same time, there are system components that need to respond to changes in data, such as notifications of a new order, a change in user data, etc. There are many such components, and their number only grows over time.

If the database is already loaded, it is irrational to attach synchronous processing to the microservice that creates the order or write a stored procedure – we will increase the timings for responses, there is a risk of violating the SLA.

And here the Change Data Capture or CDC pattern comes to the rescue.

When changes are made to the database, it writes a change log (UNDO, REDO logs) to speed up writes and optimize operations. The database sequentially writes the delta that you change into it.

CDC is an application that can process change logs, extract data change events from them, and notify a change consumer that implements business logic about this.

As a result, we get linear, time-ordered events of data changes: what was, what is, what operation, on which table or cache was performed. The processing is asynchronous, not a trigger or a stored procedure. The data is committed, the application that sends it continues to work, and the consumer receives a notification about the change after some time.

How and when to use CDC

Streaming changes to the data warehouse. You have DWH. Usually, real-time data should not enter it. To transfer data, you can write procedures that will determine the delta once an hour or a day and send it to the storage. With the help of CDC, the same data can be transferred with less delay.

  • Event post-processing. An event has occurred in the system — the user has registered, created an order, uploaded a new file — and, according to the business process, moderation or other actions must be initiated on a new record.

  • Analytics. According to the events coming to the CDC, it is possible to read analytics in a mode close to real time.

  • Logical replication. At the CDC, we have ALL the changes that happen in the database on hand. To implement replication, you just need to reliably execute them on a replica.

CDC at open source database

Design

With any refinement of a complex system, which obviously includes a distributed DBMS, there is always a risk of breaking something. The best way out is to make a new feature without touching the existing ones at all.

Therefore, when designing CDC based on Ignite, the team decided that ignite-cdc should act as a separate java process that does not affect the Ignite node.

Ignite in persistence mode, like any classic DBMS, writes changes to the WAL (Write-Ahead Log). WAL is a binary file containing changes, deltas, which we periodically write to the main memory (page memory).

From time to time, a WAL segment goes into the archive. Ignite-cdc sees that an archived WAL segment has arrived and processes it.

Processing – notifying the consumer of changes. There is a public API for the consumer, but you can write your own.

It is important that this does not waste disk space: the WAL archive is the existing functionality that is needed for disaster recovery. Ignite-cdc processes exactly the same segments, no new data appears on the disk.

The next important point is the ability to save the read state. Ignite-cdc is a separate process that can crash. You need to implement the ability to fix the state of the consumer every time he decided that the data was processed and can be saved. When it crashes, processing will continue from the last commit. Luckily, this is fairly easy to maintain: all you need to do is keep a pointer at the point in the segment where the read stopped.

From the ability to save state comes the ability to make a fail-fast application. With any problems, Ignite-cdc crashes. It is assumed that it will be raised using OS mechanisms.

At the node level, it looks like this:

There is a small subtlety: the WAL archive is not infinite, Ignite puts as many segments into the archive as specified in the settings. When archiving n+1 segments, the oldest one is deleted.

To avoid situations when CDC slowed down and did not process an already deleted segment, the archive segment is hard-linked to a folder that only Ignite-cdc works with.

If we delete the data from the archive, the file will remain in the CDC folder and the data will be available.

If Ignite-cdc has processed a segment, it can be deleted immediately. The data will disappear from the disk when both hard-links are removed.

The application will need metrics. The API is already in Ignite and needs to be reused.

API and Settings

To configure CDC, there are three parameters that need to be configured at the node level.

public class DataStorageConfiguration {
    long walForceArchiveTimeout;
    String cdcWalPath;
}

public class DataRegionConfiguration implements Serializable {
    boolean cdcEnabled;
}

Here:

  • cdcWalPath – path to the folder where WAL segments for CDC are added;

  • cdcEnabled – enables CDC for DataRegion;

  • walForceArchiveTimeout — segment forced archiving timeout: even if the segment is not completely filled, it will be archived by timeout and become available for CDC.

There is a subtlety with walForceArchiveTimeout. The WAL archive is fast due to the fact that it is a memory-mapped file. This allows us to actually write not to disk, but to memory so that the operating system will flush the file or we can do it manually when the segment is full.

Writing to disk is an expensive operation, at the moment of which the performance of the node decreases, therefore, on the one hand, writing should be done as infrequently as possible. On the other hand, the CDC learns about changes after the segment is archived, so the recording should be done as often as possible. Controversy 🙂

You can solve it by choosing a timeout according to the requirements of the application.

Now the most interesting thing is consumer, a listener that allows you to recognize and process changes:

public interface CdcConsumer {
   public void start(MetricRegistry mreg);
   public boolean onEvents(Iterator<CdcEvent> events);
   public void onTypes(Iterator<BinaryType> types);
   public void onMappings(Iterator<TypeMapping> mappings);
   public void stop();
}
  • start, stop – for initialization and stop;

  • onEvents – callback for processing changes: returned true – the state was committed;

  • onTypes, onMappings – callbacks for handling changes to meta information about types.

What is available in the event:

public interface CdcEvent extends Serializable {
   public Object key();
   @Nullable public Object value();
   public boolean primary();
   public int partition();
   public CacheEntryVersion version();
   public int cacheId();
}
  • key, value – data: value can be null if the event is by remove;

  • primary – the event occurred on primary or backup;

  • partition – partition number, needed to distribute the load in accordance with the existing partitions in Ignite;

  • version – entry version;

  • cacheId – cache identifier.

Thus, we have an application that asynchronously receives notifications of all changes to all data within the Ignite cluster. Now, based on this functionality, we can implement both the necessary business functions and logical replication.

Logical replication with CDC

By physical replication in this article, I mean the transfer between database instances of the internal representation of data: memory pages, etc.

Under the logical – selection of the flow of changes from the source database and its reproduction in the destination database.

CDC allows you to implement logical replication.

Ignite has support for two schemas: Ignite to Ignite and Ignite to Kafka.

Ignite to Ignite

Inside Ignite-cdc, IgniteToIgniteCdcStreamer works, which, by the way, is available out of the box. This is a consumer that raises an Ignite client node inside itself, connects to the receiver cluster and, receiving changes, sends an almost normal put operation to the receiver cluster.

If the source cluster is unavailable, for example due to a downed node, Ignite-cdc will wait forever for the node to start up. New data will not arrive, and the process will process those that were generated by the still live node.

If Ignite-cdc has fallen, then, firstly, it will be alive on all other nodes. Secondly, after a while the operating system will restart it, CDC will look at what changes it has processed and continue to send them to the neighboring cluster.

If a neighboring cluster or network connectivity is lost, Ignite-cdc will also fall, and after a restart it will go back to the destination cluster. If the cluster is unavailable – a fall. If available, great, CDC will start sending changes to it that have been accumulated in WAL on disk. The disk is a buffer of changes that will be accumulated until they can be processed and sent to the desired point.

Ignite to Kafka

This is a replication option for situations where Ignite clusters cannot see each other, need to use Kafka as a transport, or if there are multiple event readers.

The scheme is almost the same: the IgniteToKafkaCdcStreamer streamer is used to process events. It decomposes data into Kafka partitions according to Ignite partitions.

On the receiver side, there is a kafka-to-ignite application – it reads data from Kafka and puts it into the receiving Ignite cluster.

conflict resolver

Now we come to the most interesting part: what happens if one key is changed on both clusters?

Answer – conflict resolver will work. This is an interface that determines exactly what data should get into the cluster. It can take the “old”, “new” value, or perform a merge.

The CDC-extension provides a default implementation, but you can implement your own. At the same time, it should be noted that there is no correct solution for conflicts of changes. Without knowing anything about the data, it is impossible to determine exactly which change is correct and which is not.

Key properties of the default implementation:

  1. If the change happened on the “local” cluster, it wins.

  2. Changes from the same cluster are compared by version. A change with a larger version wins.

  3. If a comparison field is specified, records are compared by that field.

  4. If all the previous ones fail, the new entry is discarded. The data is moving around, in the warning logs, and you need to think about what to do next.

Conclusion

The introduction of the CDC pattern made it possible to add the required functionality for implementing event subscriptions and creating replicas without affecting the performance of the database engine itself.

Similar Posts

Leave a Reply Cancel reply