Case Study for a metallurgical company
Hi all! My name is Amir Rozikzoda, I am a Data Engineer at DUK Technologies. I would like to talk about a case in which I participated as a co-author under the guidance of my colleague Arthur Khusnutdinov, Data Engineer at DUK Technologies.
Relevance
Data analytics and analytics marts are sources of analytical reporting on the basis of which strategic management decisions are made. However, there are no ready-made frameworks on the market that fully satisfy the needs of data extraction.
Of course, there is Apache NiFi, but it poses many problems when working with large amounts of data. The combination of Python and Apache Airflow today is one of the best practices in the field of data management, not only for data orchestration, but also for extraction, so it is logical to develop an ETL system (Extract, Transform, Load) on top of Airflow. This allows you to efficiently manage the processes of data extraction, transformation and loading, providing reliability and flexibility in the analytical infrastructure.
Our customer, a large metallurgical company with a large number of branches, needed a simple solution that would speed up the work with data analytics and extraction from various heterogeneous sources. At the same time, a solution that was flexible enough to expand the functionality of downloads was required.
It was possible to solve the client’s problem in the classic way – by writing code, but then the development would take about three months. We didn't have that much time, so we decided to develop a framework to speed up and simplify development.
The framework was supposed to combine different code bases and create a universal approach to solving similar problems, while reducing the number of code duplicates.
I will formulate two problems with which the company approached us.
Problem 1: The company has many branches, the branches have managers, the managers have KPIs and the dynamics of these KPIs, which are discussed at planning meetings.
Managers need data. But the problem with consolidating data is that there is no single storage to create analytical displays based on it. For example, a display with the amount of copper smelted per shift. Or a display on personnel: who is currently working, who is on vacation, what is the staff turnover. Or a display on the number of iron cars shipped to different branches.
Problem 2: Many disparate data sources.
For example, one plant has data on metal smelting in MySQL, and data on employees is stored in 1C:ZiK. Another plant has a manual data entry system for metal smelting, and personal files of employees are kept in a Postgres DB. In addition, all plants enter data on initiatives into a custom manual input system with upload via API.
Based on the problems, we formulated task of two components.
Collect data, cleanse, enrich for analytics in BI and management decision-making:
Create pipelines for loading data from many different sources.
Create showcases for executives and stakeholders.
The following technology stack was chosen:
Database – Greenplum, an open-source massive parallel processing database, specially designed for creating DWH (Data Warehouse).
Orchestrator – Apache Airflow.
To store intermediate data during loading, a Minio instance was raised – this is a self-hosted object storage.
Data extraction – using code written in Python, this is a native and flexible solution in conjunction with Apache Airflow.
After analyzing the problem, we decided to create a framework for generating DAGs (DAG – directed acyclic graph) to develop one-time functions for extracting and transforming data from a specific source. And in the future, using the framework, reuse these functions.
What functions have been developed:
Universal function for extracting from API: authorization by token, or two-step, or no authorization.
A universal function for extracting from any database. Retrieving a specific table or columns from a table or passing Raw SQL that produces a response.
Universal function for retrieving and writing from S3 object storage. S3 is also used to store intermediate files.
Functions for expanding arrays and objects into a flat structure for writing them to a relational database.
Function for enriching with technical metadata: source code, record download ID, record download time.
Function for generating a file in parquet format from the received flat data
Function for loading flat data onto the STG (staging data) layer using the Greenplum PXF utility.
A set of functions for loading data from the STG layer (temporary data storage layer) to the ODS layer (layer for permanent data storage in the source structure).
A pipeline description configuration and a configurator that validates and generates pipeline code were also developed.
The developed functions cover all client cases and allow you to quickly create pipelines for loading data. We have typified most business tasks; their further reuse reduces development time.
Description of architecture modules
Framework
The DAG synchronizer parses the DAG configurator file in Git, validates the correctness of the file for errors, and automatically generates a new DAG based on the configuration file, consisting of the steps described in the configuration file. The new created DAG is placed in Git in the develop branch, the folder with all DAGs/airflow-dags, from where it is replicated to the Airflow web server and workers, and becomes visible in the Airflow Web UI.
Typical functions
A set of necessary functions was implemented for extraction (extraction from API, database, S3, FTP), transformation and enrichment of data that will be used by the generated DAG.
DAG configuration format
A DAG configuration has been developed that can be used to describe a pipeline for extracting data from any source. Each configuration describes one DAG for extracting data from one source, in it we describe the sequence of steps with which we will extract and transform this data. Each step has many options and parameters depending on what source we are working with and how we will extract the data.
Configuration Validation
Models from the library were created based on the configuration format Pydanticthis allowed us to validate the resulting configuration.
Generation of DAG by configuration
Based on models and template engine Jinja generation of final DAGs was implemented.
Metadata storage module for generating ETL processes
The module stores the JSON file of the ETL-fw configurator, the source data type mapper file, and a set of typical framework functions that will be used in the data processing process (created by the DAG). The set of typical functions will be expanded as the framework develops. The JSON file of the configurator contains the main description of the DAG, parallel and sequential steps of the DAG, a link to the data type mapper file for processing the entities of the source system, as well as links to the typical functions used in this DAG (functions for extracting data from API sources, JDBC-like integrations, loading into S3, converting data types, unfolding parquet into a flat structure, creating an STG table in Greenplum, loading into the ODS layer in Greenplum).
ETL Process Execution Module
This is where the main launch and execution of DAG instances takes place, within which data is loaded from sources into data marts (or into the ODS layer, depending on the purpose). The DAG synchronizer uses the created functions and puts them in the right order with the required parameters.
Metadata storage module for launching ETL processes
Within the module, all metadata for launching DAGs and steps and any errors that occur are stored. Each DAG and step of the framework logs process launch metadata (DAG,
step). Metadata of all processes will be further visualized for subsequent error analysis and alerting for the purpose of operational monitoring
Module for visualization and analysis of ETL process launches and errors
The module is responsible for visualization and analysis of running instances of DAGs and steps,
process execution errors. It is assumed that for operational monitoring
you will need to configure alerting.
Framework testing
Characteristics of the stand on which testing was carried out:
1 scheduler (4 vCPU / 8 Gb RAM)
3 workers (6 vCPU / 28 Gb RAM)
Meta DB Postgres (8 vCPU / 8 GB RAM).
Within the framework, we introduced several entities: idea, tags, KPI, goals:
An idea is a business proposal and hypothesis from managers. An idea has a set of values, monthly effects: how much was smelted, how much was spent, what plan we expect, what is the deadline for the idea, which branches are affected.
Tags – what the idea refers to.
Goals – what are the intended goals for this idea.
KPI – numerical values of goal indicators.
The process of filling out metadata based on the framework and generating a DAG:
Extracting data from the API: filled out the configuration or used a custom function to expand functionality.
Expanding the resulting objects and enriching them with technical metadata.
Loading temporary data onto the storage layer.
Loading into the persistent data storage layer.
DAG generation.
Result of DAG generation:
DAG running process:
api_s3_raw_ideas_json: 180-240 seconds – step of extracting raw data from the source and landing it in s3 storage.
s3_raw_s3_convflat_ideas_parquet: 5-10 seconds – step of converting into a flat structure and enriching with metadata.
s3_convflat_stg_ideas: 5-10 seconds – loading flat data into a temporary storage layer using the PXF utility.
stg_ods_ideas: 20-30 seconds – loading data from the temporary layer to the permanent one using various methods (scd0-2, snapshots).
Results of the DAG work:
Development time: it was 5-6 days for one source, the new development time for 1 source is from 2-5 hours.
Operating time: 10-15 minutes depending on the amount of data on the source.
PXF as the fastest way to parallel download in Greenplum
Without PXF Greenplum, we would store everything in memory and generate insert. The main problem with this method is that the python script is guaranteed to freeze on a couple of gigabytes of data.
If the extraction is done in parts, separating the transformation and loading, then the loading stage can take 30-60 minutes. For some sources, we had a requirement from the customer that the download should take no more than 30 minutes.
Therefore, we decided to use a combination of Greenplum PXF and parquet files. PXF is optimized for working with Greenplum with JDBC, as it loads data in parallel to segments directly from the source. This allowed us to reduce the loading time to 2-3 minutes instead of 30-60 minutes.
Configuration Validation
To validate the configuration, we used the Pydantic library: it validates any data, but its main specialization is JSON validation. A big plus of Pydantic is that it allows you to write custom validation.
Example of a DAG configuration model
class DagConfig(pydantic.BaseModel):
schedule_interval: str
tasks_flow: List[str]
dag_type: DagType
dag_id: str
types_path: str
env: str
source_code_id: int
source_system_integration_type_id: int
tasks: List[DagTask]
class DagTask(pydantic.BaseModel):
name: str
type: str
trigger_rule: TriggerRule = TriggerRule.ALL_SUCCESS
input: Union[ApiInput, DBInput, S3Input]
output: Union[DBOutput, S3Output]
transformation: Optional[Transformation] = None
Results and performance metrics
The first development of a DAG for a source is a labor-intensive process. Then the ready-made framework significantly speeds up development.
What are the results after implementing the framework:
The number of errors has decreased. Now there is configuration validation: basic errors are cut off at the configuration generation level. Due to validation, we have freed up an average of 4 hours for each integration from developers, which were previously spent on debugging.
The uniformity of the code has increased due to typing; it is now all generated by the configurator.
The framework can be easily expanded: you can create new functions or expand existing ones.
Testing, debugging, and support for current functionality have been simplified: fixing an error in one place corrects it in all others.
Improved developer UX.
As a result, we saved time for data engineers: the average time for developing integration with a source was 24-48 hours, now it is 6-12 hours. As a result, the time for developing, supporting and expanding the functionality of data flows has been reduced.
What are the next development plans:
Create a web interface that will allow you to create DAGs from a set of drop-down lists. The goal is for system analysts, using the framework, without the help of data engineers, to be able to create pipelines for loading data based on the structure of the parameters.
Add functions for retrieving data from other sources: FTP, CSV download from client computer.
Add notification functions to email or instant messengers: successful and unsuccessful completions.