Data loss during replication to analytical storage – automatic reconciliations and data quality monitoring

Data from combat bases in our architecture asynchronously enters the analytical storage (Clickhouse), where analysts already create dashboards for product teams and make selections. The bases are healthy and under tangible load: we send a fleet of planes of an average airline, several trains and a bunch of buses a day. Therefore, there are many interactions with the product.

An ETL process (data extraction, transformation, and loading into storage) often involves complex data transfer logic, and initially there is no certainty that the data is delivered without loss and errors. We use Kafka as a data bus, intermediate services on Benthos to transform records and send them to Clickhouse. At the stage of creating the pipeline, it was necessary to make sure that there were no losses on our part and that the logic of writing to the data bus was correct.

We didn’t want to manually check the discrepancies every time, in addition, we needed a service that could check new data on a schedule and show clearly where and what discrepancies are. Therefore, we made a reconciliation service, which I will talk about, because I did not find ready-made solutions.


What is a reconciliation service

The service takes data from the source database (in our case, it is a replica of the combat base – MySQL, MongoDB), takes data from the database to which the data is delivered (Clickhouse), compares the data with each other and provides analytics on how correctly this data arrived .

If there are 10 thousand in one database, and 10.1 or 9.6 in another, how will you understand further what exactly did not arrive? Will you look manually or write a reconciliation script? The difference between the reconciliation script and the reconciliation service after several manual reconciliations is not very big.

Data is added to the analytical database not according to the same logic as in production databases. The data can be transformed or combined from several tables into a flat one, so that the analyst does not have to do complex joins further. The same transformations in reverse order are performed when comparing. Then it becomes clear where the problems are.

Where can there be problems

Problems occur on different parts of the path:

  1. Data may not be sent to the bus – for example, complex logic, the order model is taken from three tables. Somewhere you can miss sending and the data will not reach, so it is important to understand what kind of data does not reach, otherwise checking by hand will take about an eternity.
  2. Transfer problems. They put it in the tire incorrectly, took it out incorrectly, transformed it in the wrong way. The data in this case has a different form – starting with extra spaces or other gaps and ending with conditional addition instead of modulo addition.
  3. Technical problems. For example, the network blinked and not all data arrived.

As a result, I can say that only tenths of a percent in our discrepancies were due to technical network problems. The rest is logic.

When we dragged the first streams to Clickhouse, we suggested that analysts use this data, because it is more convenient and the analytical base is more suitable for the specifics of queries. From the first inquiries, it was clear that the data were divergent, but the extent of the problem was unclear. As a result, the reconciliation showed many jambs, up to 70% of discrepancies in one table. And automatic checks helped to identify all types of errors, without which you can either never notice this, or work with exceptions, or from time to time write a bunch of scripts for checks. It’s long. Data streams are increasing, it is inconvenient to check by hand.

How We Implemented It

What technologies?

When choosing a technology stack, we were guided by the fact that there would be a lot of data and in the future it would be necessary to check dozens of tables every day. The service was written in Python + Spark, which looks like a good candidate given its query parallelism and data processing.

We collect data
To begin with, it was necessary to understand how to uniformly collect data from different databases – our company uses MySQL, MongoDB, Clickhouse, etc. JDBC drivers with Spark successfully cope with this, which made it possible to connect to different databases in a uniform way, pull out data from there and work with them as with DataFrames (this logic is concentrated in the Data Provider entity on the diagram).

Divide into partitions
Each database has its own syntactic features of SELECT queries. Since we have a finite amount of service memory and we do not want to affect the databases with our requests, we will pull out the data in partitions. We define the boundaries of SELECT queries ourselves, Spark will take care of the rest: it has its own partitions inside that allow you to perform all operations in parallel – from collecting data to processing them.
As soon as we have received the necessary data from the tables, we turn it into a DataFrame and then we work individually for each table.

Define tables
The next step is to define the logic of working with data in each specific case, namely: to define a couple of tables with their structure and data preparation. It is desirable – without unnecessary troubles and so that everyone can understand.

In the best case, you just need to specify the names of the fields that need to be checked against each other. At worst, you need to first convert the data to one form.

What cases should be immediately taken into account?

  1. Data can be aggregated: from several source tables, data can fall into one flat table in the target storage.
  2. Data can change according to some logic.
  3. Different databases may have their own characteristics of data types.
  4. Data can initially be stored in json format (hello document-oriented DBMS).

Of course, everything is not limited to this, but in > 90% of cases this will be enough.

We define reconciliation
In total, in the simplest case, for a new reconciliation, you need to create two classes:

SPECIFIC_MAP_SCHEMA = [
   M(source_field='id', target_field='id'),
   M(source_field='state', target_field='state'),
   M(source_field='cdate', target_field='cdate'),
   M(source_field='mdate', target_field='mdate'),
]

SpecificMapConfig = MapConfiguration(
   source_join_field='id',
   target_join_field='ch_id',
   source_partition_field='id',
   target_partition_field='ch_id',
)

For example, this is how a SELECT query with the passed parameters is formed in the code:

query = """
   select {comparison_fields}
   from {db_table}
   where {part_field} > {lower_bound} AND {part_field} <= {upper_bound} AND {filter_condition}
   order by {part_field}
""".format(
   comparison_fields=self.get_query_columns,
   db_table=db_table,
   lower_bound=lower_bound,
   upper_bound=upper_bound,
   part_field=self.partition_field,
   filter_condition=self.filter_condition
)

Direct reconciliation stage

At this stage, we have two DataFrames with a lot of data (partitions) that need to be checked. What does it mean? It is necessary to count the number of different values ​​for each ID, to pledge examples of these differences. To do this, you need to join data by ID-records:

import pyspark.sql.functions as sf


joined = df1.join(
   df2,
   get_join_condition(df1, df2, data1.join_field, data2.join_field),
   how='inner'
)

def get_join_condition(df1: DataFrame, df2: DataFrame, df1_join_field: str, df2_join_field: str) -> t.List[t.Any]:
   fields_join_condition = None
   for df1_col, df2_col in zip(df1.columns, df2.columns):
       if df1_col == df1_join_field and df2_col == df2_join_field:
           continue

       if fields_join_condition is None:
           fields_join_condition = (sf.col(df1_col) != sf.col(df2_col))
           continue

       fields_join_condition |= (sf.col(df1_col) != sf.col(df2_col))

   return [df1[df1_join_field] == df2[df2_join_field], fields_join_condition]

This function works quickly, and this is enough to calculate the statistics for the entire table. On big data (tens of millions of records with hundreds of columns) this also works well.

When something went wrong
The difficulty began at the moment when we decided to count the statistics for each column separately (percentage of discrepancies, examples). Here we come to the fact that the complexity of working with Spark has become fundamental for us. After spending some time researching, we came to the conclusion that there is no simple and obvious solution to speed up this section of code. It was possible to connect the spark extension library implemented in Scala, or look for a complex solution within the framework of working with dataframes, which failed after several attempts. We tried to use different partitioning parameters, but it was not possible to achieve the desired speed of comparing tables column by column. The very existence of special Scala libraries for such calculation of differences in data suggests that there is no ready-made and easy Dataframe API for calculating the statistics we need.

Spark seemed to be a fairly complex development tool that required additional knowledge about its design, but at the same time we did not have much time to study in depth as part of the reconciliation project.
This does not mean that this tool should not be used in your projects, we have it in other services, but its complexity is an important factor when choosing a technology stack. We stumbled upon this in a reconciliation service, gained new experience, came to the conclusion that we should abandon Spark in this project and moved on.

What are the alternatives to working with Spark DataFrame? Of course, these are the popular Pandas and NumPy libraries for analytics, which also work with DataFrame and data arrays.

Just for the sake of an example, to calculate statistics on all columns at once, in the case of vectorized functions in NumPy, it is enough to compare two matrices with the same dimensions:

np.array(diff_df[columns1]) == np.array(diff_df[columns2])

We will get a matrix of the same dimensions with True and False in case of matching and mismatching values. And then the columns can already be summed to obtain the percentage of discrepancies.

And if you need to redefine the comparison operator:

diff = np.where([
self.compare_columns(diff_dataframe, join_field, column1, column2)
for column1, column2 in zip(df1.columns, df2.columns)
])

And it still works fast!

In total, if we take the implementation on Spark with iterating over pairs of columns, a plate of tens of millions of records with more than 100 columns on our implementation with joins worked out longer than the above implementation on NumPy. The latter managed to work out in 4-6 hours. And at the same time, no multithreading and other complications were required, it is enough to divide the data into partitions so that they fit into memory. This approach took off immediately.

Since the main function of Spark – parallelization – was not useful to us in the end, in the future we plan to remove the use of this technology from the project.

Reconciliation results

What do we get as a result of reconciliation?

How do we use these results?

They help us generate a report for the team that is responsible for this data so that they can fix any problems in data delivery.

Also, these results can be presented to analysts and other customers of our products, so that they know which data they can fully rely on, and which ones are waiting for improvements.

How do we calculate discrepancies?
We take into account the number of rows and the number of different values ​​in the columns to calculate the discrepancies.

If we take the two tables from the picture below, we get the following math:
Number of columns: 3.
Number of lines: 4.
Total values: 3 * 4 = 12.
Number of differences in values: 4.
Percent discrepancies in values: (4 / 12) * 100 = 33%.
Differences in the number of lines: 1.
Differences in the number of rows, taking into account the number of columns: (1 * 3) = 3.
Percent discrepancies in values ​​given missing entries: ((4 + (1 * 3)) / 12) * 100 = 58.3%.

Overall percent discrepancy: 58.3%.

Monitoring

We use this service not only for one-time checks, but for daily monitoring of the quality of data that arrives in real-time mode. We use airflow to run regular table reconciliation tasks, and display quantitative results on a dashboard. As needed, you can also quickly add reconciliations for the last day/week/month/all history.

SELECT-queries do not particularly load the source database, but we still, just in case, turn to it at night, when the load is minimal. It has never been observed that reconciliation increased the load more or less significantly.

Thanks to monitoring, any person in the company can go to the dashboard and see the correctness of the data in the analytical warehouse at any time. For example, if unexpected changes occur in business intelligence, the first thing to do is to make sure that everything is OK with the data in the analytical store. Now it can be done without unnecessary communications.

How it is supported

No one is trying to verify the database of billions of records directly. The easiest way is to check the number of records: 10 thousand in one database, 10 thousand in another. So the data is correct. But the values ​​may differ. The question is what percentage of losses, huge or imperceptible.

In an ideal world, a mapper for reconciling two tables should be written immediately when creating a data stream to the analytical storage by the development team itself, but we have not yet perfected this process so that this stage can be transferred to developers from product teams. Therefore, such a mapper is now being made by the infrastructure data-team (that is, we). Investigations, respectively, are also engaged in us. It takes plus or minus one working day per table: very quickly for the mapper and much more for analyzing what happened to the undelivered data. Naturally, there are difficult cases with large aggregations for several days, when you need to unravel everything, and it happens that the table is magically transferred without loss.

Instead of a conclusion

Reconciliations already at the first stage helped us correct about a dozen data streams. Analysts began to trust the data in the analytical warehouse, in the future new storefronts will be built on them and old ones will be translated.

Further, we plan to simplify the creation of new reconciliations, abandon Spark completely, and add new flows on demand.

Articles we’ve written before

Similar Posts

Leave a Reply