how we made ETL resilient to constant changes in the input data structure

Typical planning dialogue:
Lead:
– Users ask to replicate these columns in these tables from the production database to the data lake.
Developer:
– When?
Lead:
– Yesterday.

A request to a task, a task to a sprint, and then manually researching the input data, preparing mapping and migrations, verifying, deploying, and after a couple of sprints, the user will receive the desired data. How can we speed up this process, say, up to several hours?

Hello everyone! My name is Semyon Putnikov and I am a data engineer at DINS. I work in a team that is involved in the development of big data management and analysis tools for RingCentral. Under the cut is a story about how we solved the problem of frequent data migrations for our ETLs and delight users with fast responses to their requests.


Formulation of the problem

At the beginning of my story, there is an ETL exporting data from Salesforce to our cozy data lake. By enriching this data with information from other sources, we can, for example, analyze the inflow and outflow of users.

The specificity of working with this CRM system lies in the limitations of a paid subscription: there is a limit on the volume of uploaded objects and their fields. We need to unload new fields / objects, check their business value, and then exclude those that did not justify themselves. These review periods need to be done on a tight schedule in order to get new, relevant analytics and develop the main product.

But how do you release releases and update the ETL every two weeks? The introduction of tight mapping of the paged data into the code led to the fact that we had to frequently release releases with updated configuration. This means launching a comprehensive verification and deployment process every time. This took a lot of resources and involved a lot of manual work. Not to mention the fact that in conditions in which it is necessary to maintain a large park of ETL processes, it is simply wasteful and impossible to allocate so much time for one project.

Our global strategy to address this challenge has been to automate ETL configuration changes. Running a little ahead, I will say that with the growth of users’ interest in the data being downloaded, the number of update requests has also grown. But the chosen solution made it possible to answer them in an adequate time.

Solution plan

The main goal is to reduce the time of delivery of changes to data by reducing the role of manual actions in the development, verification and deployment of releases.

But before discussing the architecture of the solution, it is necessary to tell a little more about the structure of the main ETL process.

The architecture of the ETL process is as follows:

  1. exporting data from a source,

  2. transformation,

  3. shipment to data lake,

  4. replication in data mart.

Each of the above steps must have meta-information about the input data. The first step is to select the required object fields. The second stage is to convert the data to the data lake type. The third and fourth steps are to migrate the target tables to the new fields.

Object configuration

An “object” in the context of this process is an artifact that contains all the information necessary to process a single table from a source. Let’s take a closer look.

To upload data, the object must tell ETL the name of the table in the source and the names of the desired fields. Also here you can optionally add information, for example, about the type of download: incremental or full download. Then the configuration will look something like this:

{
	“source_object_name” : “users”,
	“export_type” : “incremental”,
	“fields” : 
  	[
			{“source_name” : “user_id”},
			{“source_name” :  “login”},
			{“source_name” : “company_id”}
			...
		]
}

At the transformation stage, ETL maps fields to types, information about which is also taken from the object. Let’s add this information to our configuration:

{
	“source_object_name” : “users”,
	“export_type” : “incremental”,
	“fields” : 
  	[
    	{“source_name” : “user_id”, “datatype” : “bigint”},
			{“source_name” :  “login”, “datatype” : “string”},
			{“source_name” : “company_id”, “datatype” : “bigint”}
			...
		]
}

In order to neatly pack the uploaded data, it is necessary to prepare the target tables: configure the schema, partitioning, and other possible properties. Partially, the table schema will be configured using the already existing meta information: let’s take the data types that we filled in in the previous step. However, we want to retain the ability to change the field name in the target base. Let’s add this information to the object’s configuration.

{
	“source_object_name” : “users”,
	“target_object_name” : “users_stage”,
	“export_type” : “incremental”,
	“fields” : 
		[
			{“source_name” : “user_id”,
			 “target_name” : “user_id_stage”,
					“datatype” : “bigint”},
			{“source_name” :  “login”,
			 “target_name” : “login_stage”,
					“datatype” : “string”},
			{“source_name” : “company_id”,
			 “target_name” : “company_id_stage”,
					“datatype” : “bigint”}
			...
		]
}

We decided to store the configuration in json format, but this is not important for solving the main task and can be transferred to another format. Also, the configuration can be supplemented with all the necessary parameters specific to each individual process. This makes the concept portable for different use cases.

To make ETL independent of the input data metadata, we separated the configurations of objects related to this process into a separate artifact with its own release and verification cycle. We call it “ETL configuration”.

From words to deeds

The first architectural component is the config applier. It transfers the serialized object configuration to the relational model to the database. We use PostgreSQL for this.

Configurations do not replace each other, but are versioned. This allows us to keep a history of changes. Before each ETL launch, it checks the metadata in the database and receives the current configuration.

In addition to the main task at this stage, the configuration is validated in accordance with the accepted rules.

The first rule of the club of good configs is that the number of features in the new configuration must be greater than or equal to the number of features in the old configuration. However, fields can be marked as “non-paged”. This rule protects us from the problem of migrating the historical data schema: it is easier for the target table to understand where to map data with the old schema without a name collision.

The second rule is that each version of the configuration must have the minimum required list of parameters for the process. This is where non-viable versions that can lead to a process malfunction are clipped.

Optionally, you can check the absence of typos in the nominal parameters.

When the checks are done, json turns into a set of insert scripts and is sent to the database.

After the new configuration is located in the relational model and is ready for use, it is time for the second application – config migrator.

The logic for migrating tables is based on comparing different versions of a configuration in a database. The application pulls the last and the penultimate version of the configuration for an object from the database and builds a migration script based on their differences.

In the current version, as I mentioned above, we do not consider the possibility of removing fields from the object, since this would require a back migration of all historical data, which is quite expensive.

Let’s look at the operation of this application step by step. First of all, the application compares the configuration versions and gets a list of changes. Then, sql-scripts for changes in the “ALTER TABLE ADD COLUMN” format are generated for the target tables both in the data lake and in the data mart. When migration scripts are received for all tables, the application starts their execution one by one.

The main difficulty here is working with the peculiarities of the process of changes in individual databases. For example, Apache Hive lacks the DROP COLUMN operation. Instead, we need to use REPLACE COLUMNS with the old schema to roll back to the previous version.

Sequential execution of migration scripts allows us to monitor the state of target tables and quickly catch incorrect behavior during migration.

Implementation

We began to implement the adopted decision in stages. At the first stage, the configurations of objects were rewritten for the new format and the work with configurations from the database at the ETL level was supported. In the second step, we automated the process of rolling out the configuration in the database by creating the config applier. The final stage was the creation of the migration application.

Dividing the implementation process into stages allowed us to immediately benefit from the new architecture. In addition, the successes of the initial stages made it possible to obtain resources for further development.

Plans for the future

The first point of improvement is to encapsulate all the configuration logic in a separate library to facilitate porting to other projects.

The second point is related to moving the process of migrating tables from an external process into ETL, so that the preparatory stage of each launch of the pipeline is to check the changes and launch the migrations. This method will not require stopping the scheduled pipelines for a lock-out migration from an external process. However, at the moment the process is unstable due to the specifics of the databases we use.

Outcomes

From dozens of hours of manual work that wasting time on more interesting tasks, we moved on to a couple of hours in a sprint to write several lines to json scripts.

The verification process has become more transparent, since all automation components can be tested regardless of the object configurations themselves. And the configurations themselves are validated internally by checking the config applier.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *