Our journey of migrating on-prem analytics to the cloud

Hi, my name is Vladislav Melnikov, I am an expert in the analytical data platform team of the Supply Chain Management project at Magnit tech.

Background

In September 2022, Magnit launched a project to transfer analytics from on-premium hardware to the cloud. The benefits of this were obvious – the company was undergoing a fairly active digital transformation, which required an increase in easily scalable computing power. Plus, the pressing issue of import substitution arose. In fact, the company had to choose both migration paths and possible work tools. A decision was made to migrate to the Yandex cloud.

At the start of the migration, we had Teradata, MS-SQL, Oracle, Hadoop (historical data) as sources.

Selected stack

At the time of the transition, the company had chosen the following solution stack for our problem:

  • Airflow – orchestrator

  • S3 for storing initial and calculated data

  • Spark as a calculation engine

  • Clickhouse – data marts for reports

  • Apache Superset – for reporting

There are many articles on all these tools in English, and slightly fewer in Russian. The purpose of our article is to describe our solution and the pitfalls we stepped on during the migration process.

Concept

The general concept was a Pipeline: Teradata -> S3 -> (transformation with Spark) -> Clickhouse.

In general, the concept has not changed, only slightly modernized. For example, two more types of databases were added as sources – MS SQL and Oracle. It was necessary to maintain service records for incrementally loaded tables in PostgreSQL instead of S3, to start YDB for work in Multi-Cluster mode (more on this below), etc.

Let's consider the implementation in stages.

Unloading and transformations

Unloading from Teradata

Initially, the task seemed simple to us – to make a SELECT to the DB and save the uploaded data to parquet, but here we encountered difficulties. The first is the volume of data being transferred. Initial attempts to use JDBC drivers were unsuccessful, since it was necessary to transfer daily volumes of one table of about 220 million rows * 97 columns to the cloud (which in archived CSV form is about 10 GB). In addition, the optimizer moved flat queries to Teradata PROD to the very end of the queue. The next iteration of the upload involved using the native Teradata utility tbuild, which implements the TPT mechanism. Optimal upload settings were found empirically – 10 streams of 200 MB each. Files were uploaded directly to S3 in compressed form (gzip). As a result, the upload time was reduced to 2–4 hours. All these uploads were orchestrated using Airfow.

Since simply unloading a table for one day within 2 hours (let alone 4 hours) did not suit us, we used the dynamic task mapping mechanism in Airflow. We divided the unloading into N streams. At the moment, N=10, since we unload several tables and the volumes unloaded at one time are limited by the account settings. The division into streams is defined as the remainder of dividing some more or less uniformly distributed ID by N. At the moment, the unloading takes about 15 minutes, depending on the load on the source server, of course.

Unloading from MS SQL and Oracle

As mentioned above, it turned out that for unloading we needed to get data from MS SQL and Oracle. Fortunately, the tables unloaded from these DBs had smaller volumes.

The problem was that we could not use, for example, the BCP MS SQL utility and we had to use JDBC for unloading. In Spark, JDBC can be launched in parallel mode. However, after testing it, we abandoned this implementation. What is the problem? To ensure parallel unloading, JDBC must define the column by which partitioning will occur, the lower limit of the value, the upper limit of the value, and the number of partitions. Such a definition does not allow for uniform parallelization of streams. In addition, if you look at the query plan, you will see that JDBC actually forms parallel execution of SQL queries with automatic definition of WHERE constraints.

The same can be organized in PySpark using python libraries futures or multiprocessing, but this mechanism is more universal. There are many articles on the Internet describing how to organize such a mechanism, so I will not describe it here.

Transformations in S3

So, the next, second stage was converting *.gz files (Landing) into deltaLake files (RAW layer) necessary for further work. For this, we use the DataProc service with Spark. At the beginning of the transition, we used a constantly running DataProc cluster. What we came to later – I will describe below.

All converted tables are stored in S3. This is done because, firstly, Clickhouse hosts our final showcases and the cluster is constantly loaded with BI user queries without additional calculations. Secondly, Clickhouse is a columnar OLAP database, which at the time of the start of the work did not work very well with JOIN and achieved the highest speed when using denormalized tables. Thus, there was no need to store normalized data in Clickhouse. Another point is calculations. For some calculations, several tables are used with a processing depth of up to 3 years. i.e. this is about 240 billion rows. It was decided that for such volumes it would be logical to use Spark.

The third stage is various calculations and denormalization of data for further one-to-one provision to Clickhouse showcases (DDS layer).

I don’t think that anything needs to be added, the division into such stages occurs in almost any DWH and we haven’t come up with anything new here.

What we came across while implementing these stages.

It turned out that the data in our sources changes retroactively. And sometimes to a fairly large depth. Re-uploading such volumes every day is very time-consuming. We cannot afford this in most cases, although in some cases we do. But for some tables, we had to create additional tables in the sources, which only included changes from previous days. And then roll these changes into Spark. Theoretically, you can use standard deltaLake mechanisms for this, but for the reasons described below, they were abandoned.

For the case of unloading from MS SQL and Oracle, we refused to store intermediate data in the landing, since we could immediately write to the RAW layer: on the source side, we did not have the ability to raise CDC and did not have a PUSH increment. In fact, we take the entire daily data for the past days once a day.

Orchestration

The main purpose of Airflow is to orchestrate data flows. At the moment, there is an opinion that if you provide the necessary resources and turn Airflow into a large cluster, you can even use various Python libraries for calculations, such as Pandas. We have abandoned this approach; for us, Airflow is something like a convenient shell for running cron. All calculations are carried out using specialized tools for which they are intended.

On implementation. Initially, we combined all the stages of the Pipeline into one DAG in different ways. However, it turned out that for one reason or another, the DAGs fell with enviable consistency. The reasons varied depending on the situation and 99% of it depended on the availability of the source. Theoretically, a DAG can be restarted using standard Airflow tools, but as it turned out, this option is viable with daily work and constant monitoring of the unloading. When unloading historical data from multiple tables, the instability of the unloading led to distraction from the development of data engineers, an already small team. Accordingly, it was decided to configure asynchronous loading.

That is, when loading incremental data (facts), each of the Airflow stages is launched separately, independently. Thus, if we consider the chain Upload -> Transform -> Loadthen if Unloading it was working, but during the transformations, for some reason, a failure occurred, Unloading started to pull in a new batch of data, and Transformation could catch the first unloaded batch, or it could catch two unloaded batches at once. This minimized the processing time and achieved stability. In addition, each DAG was configured monitoring (using a separate task), which, in case of failure, sent a letter to the necessary recipients. The fact of DAG processing was recorded in *.json files in S3. However, at some point, we were no longer satisfied with this, and at the moment we have transferred this to Postgresql.

The start of loading reference books, due to their small size and high processing speed, is organized into one DAG, which starts several DAGs in a row, corresponding to the required stage. In the figure below, a separate step is added – check_dag_state. We came to the need for it later. Clickhouse, as it turned out, is a rather capricious DB. When servicing it, it is necessary to disable all existing DAGs for filling into the DB. And if the DAG for filling in Clickhouse is disabled, then the entire initial DAG hangs and waits for the execution of the disabled subDag. This behavior leads to incorrect transformations in the DDS layer. The current implementation allows you to disable the DAG for filling in Clickhouse without any consequences for other calculations in Spark.

To implement significant transformations with the least time and money costs, we decided to use dynamically created clusters instead of permanent ones, fortunately, the cloud allows this. In addition, the difference in terms of resources spent is not significant, but in terms of time, this allows us to parallelize the task. Thus, using Airflow Spark cluster tasks in the configuration we need, we perform the calculation and delete this cluster. In some cases, we process up to 6 partitions of the same table at a time. Of course, there are nuances in such processing here too. I will describe this below.

A typical example of a DAG is a graph like this. The screenshot shows that 2 clusters are running in parallel.

What we came across. We had to modify the standard operators for working with Yandex DataProc so that we could pass values ​​to some fields from other tasks. Another point is deleting clusters. During debugging, we had cases when clusters were created, DAGs fell, and the clusters were not deleted. And we could suddenly stumble upon 70 raised, non-working clusters. Accordingly, it was necessary to unconditionally delete the cluster on the one hand, and it was necessary to have logs of the fallen scripts on the other (and the logs are also deleted when deleting the cluster). To do this, we saved the logs to S3 in case of a task failure. In addition, it turned out that there are cases when the cluster creation process stops due to a lack of quotas, while the cluster is considered raised, and the DAG operator returns an error. For such a case, additional processing was done to search for the DataProc cluster ID by its name using the cloud API (in the figure above, this is step get_dataproc_cluster_id ). And the last thing we came across while working with Airflow (we are currently using version 2.7.3) – unfortunately, the processing trigger_rules for task groups, when mapping groups, it works with some peculiarities, so I had to very carefully configure these values.

Working with deltaLake

Historically, we have maintained tables in S3 in the deltaLake format (open-source). What are the advantages in our case? Firstly, when recording, not only the statistics stored in parquet are used, but also the deltaLog description (by default, statistics are collected for the first 30 fields). Secondly, there are procedures that automatically combine small files for faster reading. In addition, now among the DataProc images in the preview status in Yandex Cloud there is an image with Spark 3.5.0, which, together with deltaLake, allows using the new Liquid Clustering partitioning technology. After various tests, we came to the conclusion that for some tables, processing with such partitioning was reduced to 10 times. For some tables, no positive effect was found. In the near future, we will switch to Liquid Clustering for some data.

A special feature of deltaLake is storing data change logs in json files, and to speed up the processing of this data, the json state is periodically combined into *.parquet files – the so-called checkpoint. And this feature imposes certain restrictions on the simultaneous processing of the same table by different clusters, which is an important part of our strategy for parallelizing calculations. In deltaLake, there is a solution to this problem in the form of multi-cluster processing. The original open-source deltaLake suggests doing it using DynamoDB. Yandex Cloud provides the ability to do this using a DynamoDB-compatible serverless DB – YDB. The introduction of a multi-cluster has dramatically reduced both our reuse of resources for calculations and the calculation time – all clusters began to work in parallel without errors in the process of writing logs.

During our work we encountered an unexpected feature. Theoretically, deltaLake provides ACID capabilities. The mechanism provides the ability to update records using MERGE and UPDATE. However, during testing it turned out that the speed of these mechanisms for open-source deltaLake in our case leaves much to be desired. To update data, instead of these mechanisms, we use a window function – we simply take the latest record.

At the moment, the Yandex cloud has a DataProc image with Spark 3.5.0. This version allows you to use deltaLake 3.2.0 with a new type of partitioning Liquid Cluster. Theoretically, using Liquid Cluster should give a performance boost. Our tests showed the following – some scripts accelerated from 25-30 minutes to 3-7 minutes on the one hand. On the other hand, for some scripts there was no increase. i.e. Liquid Cluster partitioning is at least no worse than Hive partitioning. However, we did not go down the path of a complete transition to Liquid Cluster. Simply because the data calculated in Spark must also be uploaded to Clickhouse. The data selection algorithm using only one partition was tested on a large volume of data (more on this below). Reading from deltaLake using the corresponding function has not yet been tested by us on the required volumes. In addition, it is necessary to remember that all data stored in Liquid Cluster must be constantly optimized for faster reading, which also requires additional time and resources. At the moment, we are in the process of switching to Spark 3.5.0 and plan to use Liquid Cluster on the RAW layer. Perhaps, in the future, we will switch the DDS layer to Liquid Cluster as well. As for file optimization for Hive partitioning, we have daily DAGs configured that delete old files (VACUUM), and weekly DAGs that optimize the current number of files (OPTIMIZE).

Upload to Clickhouse

The hardest part, oddly enough, was loading into Clickhouse. There are a lot of pitfalls there. For example, disk volumes. There were 2 types of disks in the cloud – local and network non-replicated (NRD). Local disks are a mirror, so for fault tolerance, only 2 hosts per shard are needed. It turned out relatively quickly that disks tend to fill up very quickly, and the volume of local disks cannot be more than 3 TB. This led us to the fact that we need more shards or change disks. NRD disks allowed us to have volumes of up to 8 TB, which was not bad in our conditions. However, there were some nuances. It turned out that for fault tolerance, at least 3 hosts per shard are needed and, what is most unpleasant, NRD disks fail quite often. Theoretically, having 3 hosts per shard is not critical, but as it turned out, when one of the shard disks fails, during INSERT, Clickhouse issues a corresponding message, and the driver simply considers such an incident as an error. We did not foresee such situations, and we urgently had to rework all Airflow DAGs to ignore this error.

Inserting a large volume of data into a sharded table. It is no secret that it is best to upload to a distributed table in Clickhouse not through a Distributed table, but directly to a shard. To do this, we select a more or less evenly distributed entity in the table (in our case, it turned out to be the product ID) and simply take the remainder from dividing by the number of shards. According to this remainder, the upload to the corresponding shard occurs. We upload sharded aggregate tables with the ReplicatedAggregatingMergeTree engine using a Distributed table.

Projections

Projections turned out to be a very convenient way to speed up queries. At the same time, as it turned out, in some cases we did not include projections or included them if we included an additional condition in the WHERE condition. For example, the table has only one year of data (let it be 2023). If we add YEAR_ID = 2023 to the WHERE value, then the projections worked, but without such a condition they did not work. We did not find the reason why. However, we found a workaround – we simply reduced (and all our projections are groupings) the number of aggregation attributes in the projection. And we made several projections. After that, all projections began to be used more consistently.

As for the download itself from deltaLake to Clickhouse

Clickhouse had a function for accessing deltaLake tables. Until recently, it did not work correctly with partitioned tables. In this regard, another function was used to load data – S3Cluster, which allows you to access files directly. In order to have an up-to-date list of current files in deltaLake (and the deltaLake format stores the history of changes), the property of automatically generating manifests after changes was assigned to the necessary deltaLake tables. In deltaLake, this is simply a text file listing the current *.parquet files. Then, during loading, an SQL query was written that opened this manifest, read the list of files, and these files were read using S3Cluster.

In general, this is the main list of problems that we encountered when transferring data to the cloud. Of course, there were also infrastructure problems that are being solved one way or another by the professional team of Yandex. For example, with updating versions or in settings and using Superset as BI.

The purpose of this article was simply to share ways to address the issues that arise in the area of ​​Data Engineering when moving to the cloud. I felt that this could save time and frustration for those who are currently moving down similar paths.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *