A quest to synchronize analytical and operational storage in real time without loss on hundreds of terabytes of data

I work at STM Labs, where we build large, highly loaded Big Data systems. This article is based on my presentation at the conference Saint Highload 2023. I want to tell you a fascinating story about how we were looking for the best solution for synchronizing analytical and operational storage in real time. It was important for us to do this without loss, because hundreds or more terabytes of data were at stake.

I’ll immediately outline what’s in this article. will not:

• I will not talk in detail about the types of DBMS and their differences.
• I will not review analytical DBMSs. Here everyone chooses for themselves.
• I will not dwell in detail on the architecture, fault tolerance and scaling of the MongoDB DBMS.
• I will not review the differences between OLAP and OLTP.
• I will not review or compare CDC implementations in different DBMSs.

Before starting this exciting journey, let's take a short overview of the subject area in order to fundamentally understand the semantics of the data, the load model, and so on.

So, the subject area is a large, or rather global Track&Trace system for providing a unified process for tracking the status of various groups of goods. The system implements instance-by-instance control of the movement of goods in logistics chains, starting from the production process. Next comes logistics and distribution, and everything ends with retail, or write-off, or another final state into which the product passes at the end of its life cycle.

A few words about the architecture of the processing itself

This is not the subject of this article, but for a better understanding of it, it is important to understand how the processing in our solution works in principle.

So, we have a certain source of events or documents about the movement of goods in supply chains. These events arrive in an incoming queue built on top of the Apache Kafka broker. Processors read messages from the queue and perform some business logic based on this data. Business logic is, in fact, simply a change in the state of a product as it moves along the commodity distribution chain within the framework of a certain model described by a complex finite state machine.

As a product moves, its current owner, status, and other attributes change. Well, the rightmost element in the architecture is the operational storage, which reflects the current state of the product.

We use MongoDB as operational storage.

We use MongoDB as operational storage.

The description of the task sounds as simple as possible, but in fact it is not simple. We are faced with the task of synchronizing operational and analytical data. Let's be honest: which of us even has a separate analytical cluster? I am sure that some readers understand all the pain and, most importantly, the reasons why it is not necessary to run analytical queries on operational data.

Our synchronization requirements look like this:

The first and most important requirement is, of course, reliability. Any synchronization must be reliable, like a Swiss watch – that is, we must converge to zero without loss!

We should also at least try to ensure minimal lag between the data in the analytical cluster and the operational data.

Plus provide high fault tolerance of the entire solution and do it like this so that our synchronization does not create additional parasitic load on the RAM storage — or minimize such costs. This is especially important during periods of high/peak load on the system.

The load model looks like this:

  1. The write load is 10 thousand operations per second.

  2. The reading load is a little higher, 15–20 thousand operations per second.

  3. The dataset size is 150+ terabytes.

The Samurai's Long Journey

Our path as a samurai was long, as many attempts were made to implement different approaches. The whole epic lasted a little over a year, but we finally came to a target solution that completely suits us.

We started the same way everyone else starts. We simply had one operational storage, and we sent all analytical requests directly there. Bravery and stupidity! As you understand, this did not end well: with the growth of data volumes, at one point everything simply stopped working.

Then we finally raised a separate analytical cluster. The first thing that was implemented was interval uploads to make data transfer more controllable. However, this solution did not suit us, because data losses regularly occurred during such transfers. I had to deal with all sorts of reconciliations, refills… In general, it was absolute hell! And then we firmly decided that something needed to be changed! As you understand, this was a spoiler, and the most interesting things are yet to come! So the journey begins…

Iteration number one. Direct queries to operational storage

Yes, everyone starts with this… It is clear that in pursuit of functionality no one thinks about architecture, especially at the start of the project.

Obviously, volumes of operational data do not appear all at once. My point is that we can afford this story at the initial stage of the project or even as a target option when the volumes are small – if it is not Big Data or another highly loaded solution. Then you can live with this all your life and not fence any garden in the form of a separate analytical cluster.

So, at the start of the project, we simply took and actually landed all analytical queries directly into the operational storage. Parasitic load?! No, we haven't heard 🙂

The advantages of this approach are obvious:

• it is really cheap and cheerful, since a separate analytical cluster is not needed;
• you can quickly create new reports for your business, literally baking them like hot cakes.

The disadvantages, in principle, are also obvious:

• as data volumes grow, the impact on the performance of the entire system increases;
• analytical queries create a parasitic load on the operational storage;
• new reports require the creation of additional indexes. And so on, like a snowball… This has a strong impact on data insertion.

Well, the most terrible thing, in my opinion, is the mixing of OLAP and OLTP queries. We are all adults and have not believed in the existence of Santa Claus and the Silver Bullet for a long time. Likewise, any database has its own specialization and is intended for specific purposes and tasks.

As a result, we experienced degradation in the performance of core business processes.

No one was happy with this, so it was time to change something and move on.

Iteration number two. Separate analytical cluster

They provided it to us, and then we thought: “How to transfer data into it?”

The simplest solution is interval uploads. We take and periodically pour all the deltas of changes during the interval. This can be done in a controlled, manageable manner – that is, we can choose some hours when the load on the system is relatively light.

The advantages here are also obvious: there is a controlled manageable load, analytical queries are no longer executed on live data. Accordingly, you can delete all analytical indexes.

The database literally exhaled, degradation disappeared during insertion. It was a great achievement, we were all very happy!

But, unfortunately, as this solution was used, various problems emerged. Our interval uploads were based on the so-called date of the last operation (business date). The point is that various operations are performed on a product as it moves through the supply chain. Based on the date of the last operation, we did the unloading. They simply took a time interval and unloaded all operations for it. But the problem is that sometimes there were updates in the database during which this business date was not updated, which means that the delta of changes did not fall into the interval upload and in fact the analytics did not see this change.

The second problem was that in this story there is no normal handling of the delete operation. If something is completely removed, then it is not in the delta! And we had to literally screw the crutch on the side with electrical tape, which didn’t suit us at all! We struggled for some time, even tried to implement various reconciliations, transfers, top-ups, and so on… In the end, we got tired of it, and we moved on to the next stage – we decided to start from scratch and rewrite everything.

Iteration number three, final. Or CDC vs ETL

CDC or Change Data Capture is a mechanism that is actually built on top of the write-ahead log. In MongoDB this is oplog. The write-ahead log can be accessed via an API, plus there is a set of tools that allow you to read the log.

The biggest advantage of the CDC implementation in MongoDB is that all changes are tracked at the replication protocol level. We already had replication, since the database was deployed in a failover cluster topology. It turns out that as a bonus and completely free of charge, in addition to replication, we received a ready-made CDC.

In MongoDB, the CDC implementation is called Change Streams. There is a special API for working with it that allows you to track all changes.

Let me give you an example in python:

cursor = db.inventory.watch(full_document="updateLookup")
document = next(cursor)

In the example, we monitor changes to the inventory collection using the watch() method.

This method returns us a cursor through which we iterate and read all the changes that occur in the database.

A very cool feature in Change Streams is that you can track changes not only in one collection, but also at the level of the entire deployment. For those who are not familiar with MongoDB, let me remind you that a collection is analogous to a table in a regular database.

Plus Change Streams utilize all the power of the aggregation framework. For those who have not worked with MongoDB, but are familiar with relational DBMSs, let me explain that this is an analogue of GROUP BY. Yes, everything here is a little more powerful – it’s a whole framework that allows you to do filtering, aggregation and many other cool things.

However, in addition to the API, the vendor provides a component called Mongo Connector. This component implements two modes:

• the first mode is Sink, which allows you to read data from a Kafka topic and save this data to MongoDB;
• second mode – Source. It is, in fact, about what we need. This mode allows you to track all changes in the database using Change Streams and publish them to a Kafka topic.

Cool! Why not try this solution?! Integration with Kafka is already ready.

Mongo Connector Options and Settings

Description

connection.uri

DBMS connection string

database

Name of the database to track changes

collection

Name of the collection to track changes

topic.namespace.map

Mapping to the name of the topic where CDC events are sent

poll.max.batch.size=10

Maximum size of a stack of documents (quantity) for batch processing

poll.await.time.ms=1000

Maximum time for collecting a package of documents

pipeline=[{$match: {"$and": [{"ns.coll": {$regex: /^coll_prefix/}}, { "fullDocument.entity_type" : 0 }]}},{ $addFields: { "storage": "MAIN" } }]

Pipeline for aggregation and filtering. Here we filter by the prefix of the collection name (ns.coll) + plus by the entity_type attribute: 0 (documents with a different value for this attribute are filtered)

Plus we add a static “storage” attribute to the CDC event format: “MAIN”

change.stream.full.document=updateLookup

Returns the modified full document. By default only delta changes

mongo.errors.tolerance=all

Continue processing if errors occur

mongo.errors.log.enable=true

Log errors

mongo.errors.deadletterqueue.topic.name=deadletterOP

Topic for erroneous documents

startup.mode=timestamp

Submit all changes starting from the specified time

startup.mode.timestamp.start.at.operation.time

Time from which we start reading oplog after start

bootstrap.servers

Connecting to Kafka

offset.flush.interval.ms=90000

Kafka data flush interval

offset.flush.timeout.ms=17000

Kafka write timeout

offset.storage.file.filename

File for storing the current offset

producer.max.request.size=18000000

Maximum request size for a Kafka write

producer.buffer.memory=8388608

Memory buffer size for flush operation

producer.enable.idempotence=true

Idempotent entry

The documentation for the Mongo Connector is very sparse; we found many things literally at random, or rather reverse engineering, that is, digging into the code of the connector itself 🙂

It is clear that there are settings for connecting to the DBMS. We can specify the database for which we want to track changes. We can specify the name of the collection on which we should monitor these changes. You can configure topic mapping, that is, where to write what changes. For example:

colA → topicA, colB → topicB and so on, including more complex mapping.

Naturally, the connector already supports batch processing to ensure high performance. It accumulates changes and stores them in Kafka. The accumulation policy is configured by the waiting time for changes or by their number.

I said earlier that Change Streams implements and uses the full power of the Aggregation Framework. In our solution, we subscribe to receive data changes in several collections at once using the collection name prefix. The name prefix is ​​specified as a regular expression.

Next, using filtering, we analyze only the documents we need in these collections, and if the document contains a field entity_type with a value of 0, then such documents suit us. All other documents are eliminated. Plus we immediately add static metadata: for example, a field storage The MAIN value is used to indicate the source of the data so that the consumer can use this metadata for their needs.

The next important parameter is the operating mode of the connector. By default, if you do not set this mode (and, by the way, it is not set!), then Change Streams simply return the change delta to you. And then, having a delta of changes, you must somehow get the changed document. Sounds complicated…

We danced with the tambourine for a long time, looked for various other modes and found the mode updateLookup. This thing gave us an actually modified full consistent document, and not just a delta of changes + a description of these changes.

In addition, the connector implements error handling logic out of the box – error tolerance. There is no need to implement it separately in your application. Take and use what is ready.

The next plus is the convenience and flexibility of reading the write-ahead log oplog. You can read it from Adam and Eve, or from a certain time. This feature is convenient for re-processing.

A very cool feature is idempotent processing in the connector itself. Again, no need to worry about this at the application level.

Let's compare the bare API and Connector:

Comparison criterion

Change Streams API

Connector

Requires integration with MongoDB Change Streams API in the application

Yes

No

Supports Sink mode

No

Yes

Scaling out of the box

No

Using Kafka – adding partitions to a topic

Load damping out of the box

No

Yes – queue in Kafka

Complexity and cost of infrastructure and operation

Easier/Cheaper

More complex/more expensive (requires Kafka cluster + capacity for Connector)

Maximum integration flexibility

Yes

No, limited to Connector capabilities + tied to Kafka

What does the format of the CDC Event itself look like?

{
  "schema": {
    "type": "string",
    "optional": false
  },
  "payload": "{}"
}

The most interesting part of the format is the payload:

{
  "_id": {
    "_data": "826447837900000D5B2B022C0100296E5A1004AF54C14"
  },
  "operationType": "update",
  "clusterTime": {
    "$timestamp": {
      "t": 1682408313,
      "i": 3419
    }
  },
  "fullDocument": {},
  "ns": {
    "db": "prod",
    "coll": "product.history"
  },
  "documentKey": {
    "entity.id": “11103671001152nZ5zeBdkrqnMo",
    "_id": {
      "$oid": "643d30d39ea769ba2c8a4ffb"
    }
  },
  "updateDescription": {
    "updatedFields": {},
    "removedFields": []
  }
}

I would like to pay special attention to the field operationType. When describing the solution for interval uploads, I pointed out the problem associated with the inability to properly implement the logic for processing the delete operation – delete. So, here it is solved – we have the delete operation type. There is a special CDC event for this.

Next comes the actual document itself. fullDocument – this is an already modified document. That is, if you have the mode turned on updateLookupthe full consistent modified document will be recorded in this field.

I almost forgot to mention clusterTime is actually the time when this data change happened at the database level. This is a very important parameter; we use it for data slicing to cut off historical data by timestamp.

And finally the last structure – updateDescription. This is, in fact, a description of those document fields that have changed and/or been added. You can also use all this metadata in your business logic if necessary.

What does the overall enlarged solution architecture look like?

I will not demonstrate any monstrous architecture here. We will look at a simplified diagram that everyone can understand:

Let me briefly dwell on the element CDC Processor is our microservice that deals with additional post processing of CDC events.

Another interesting square is elements for reprocessing. Unfortunately, we were unable to completely abandon reprocessing and some kind of manual synchronization, but we have fully automated it. We created a special mode for manual synchronization and reprocessing, which are actually implemented as a set of commands entering a special control queue based on Kafka.

CDC Processor reads these commands, executes them and sends the execution result to our analytical consumers.

Is it possible to do without CDC Processor in the most general case? Of course you can! I am sure that 90% of problems can be solved simply by using the functionality of the Mongo Connector itself. In our case, additional filtering, slicing and enrichment were required, as well as a change of format for analytical consumers – for them we cut the entire history of the movement of goods into individual events and cut off historical ones. What are the advantages of CDC Processor for the future development of the solution? In the future, we can make it more universal and implement all custom logic in the form of plugins.

Let's move on. Monitoring — where would production be without it? From my point of view, running a system on prod without monitoring is suicide. As part of monitoring, we, of course, monitor the status of the architectural elements themselves. But the most important thing is queue monitoring. The lack of LAG growth in incoming queues indicates that the application is regularly processing the entire incoming load with the required performance. The growth of LAG indicates that the application cannot cope and it is time to scale it out into the horizon – install additional instances for processing.

It’s the same story with outgoing queues: analytical consumers also have to have time to read everything that we shipped to them… Of course, Kafka has retention, but this is for damping peak loads and accidents.

So what have we built?

The results of our long journey in terms of comparison of two approaches – ETL and CDC

Comparison criterion

ETL

CDC

Reliable synchronization of operational storage with the analytical cluster without loss

No, there is a tie to the business date of the last operation, which is not always updated. There is no normal delete processing

Yes, we get all document changes, including the delete event

Minimal data lag in the analytical database

No, interval uploading of the change history. Data is lagging

Yes, we receive all changes in real time

Minimum uniform additional load on the source during synchronization, including in the CNN

No, the workload is higher than at the CDC, because… there is an additional request for all changes in the interval [start_dt, end_dt]

Yes, we work at the oplog level on top of the replication protocol

High fault tolerance in case of failures

No. There are many points of failure in the ETL pipeline itself, which are difficult to control in terms of potential data loss during transfer

Yes, there is a built-in error tolerance mechanism and reprocessing is possible

This is how the experiment turned out. I would like to say that all the tricks and features were implemented by professionals, do not repeat them from your own experience! But on the other hand, maybe it will be useful for someone to repeat it and draw their own conclusions 🙂

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *