Stress-free DAGs: Our Experience of Using Metadata with Apache Airflow

Apache Airflow is a simple and convenient batch-oriented tool for building, planning and monitoring data pipelines. Its key feature is that using Python code and built-in function blocks, you can connect many different technologies used in the modern world. The main working entity of Airflow – DAG – directed acyclic graph, in which nodes are tasks, and dependencies between tasks are represented by directed edges.

Those who use Apache Airflow to orchestrate data loading tasks into storage will appreciate the flexibility it provides for solving templated tasks. When the whole development process comes down to filling in a configuration file with a description of the DAG parameters and a list of tasks to be performed. At Leroy Merlin, this approach is successfully used to create tasks for transferring data from the raw layer to the ods storage layer. Therefore, it was decided to extend it to the tasks of filling data marts.

The main difficulty was that we do not yet have a unified methodology for developing data marts and procedures for filling them in. And each developer solved the problem based on their personal preferences and experience. This fits into one of the main corporate IT principles – “You build it – you run it”, which means that the developer is responsible for his decision and supports it himself. This principle is good for quickly working out hypotheses, but for things of the same type, the standard solution is more suitable.

As it was

It is worth telling here how the development for loading data marts was carried out before. The developer writes the upload procedures in GreenPlum, develops DAGs and for their launch, then creates a new repository on GitHub from the template, uploads the code of his DAGs and adds his repository to the main Airflow project as a submodule. With this approach, the following difficulties arose:

  1. Need to dive into Python and Apache Airflow;

  2. At the start of development, the release of the main project took place once a week, so to see your DAGs on the Airflow product, you had to wait;

  3. The main project grew gradually and began to slow down during deployment;

  4. DAG code scattered across different repositories performing similar tasks is difficult to manage;

  5. The lack of a unified approach also affected the quality of the SQL code of the procedures. It was often possible to come across complex logic for managing boot parameters, which could easily be “outweighed” on Airflow.

All of the above led us to the idea that it is time to take control of the situation and start developing a standard solution. Analysis of existing DAGs showed that most of them are very simple, do not contain complex dependencies and consist mainly of DummyOperators and PostgresOperators. This served as the starting point for the development of a new tool, which, in turn, had to:

  1. Be able to create DAGs based on a configuration file in YAML format, which would indicate the main parameters, such as: start date, schedule, parameters for connecting to the database, names of procedures to run, their parameters, etc. YAML files should be stored inside the corporate service for managing metadata, you can get their content through the API;

  2. Be as simple as possible, have clear documentation so that the immersion does not take much time;

  3. At the same time, be as flexible as possible, be able to work with the maximum possible number of DAG settings in Airflow.

What is

The result is approximately the following template for the configuration file:

From which this DAG is created:

Description of parameters

Common parameters:

  • module_name – needed to form DAG_ID;

  • pool – the pool in which the tasks will be launched;

  • queue – a queue for tasks;

  • owner – the owner of the DAG;

  • postgres_conn_id – DB connection string;

  • email – list of emails for sending alerts;

  • tags – a list of tags for finding a DAG in the UI;

  • access_control: role for managing the DAG;

  • schedule_interval – schedule for launching the DAG;

  • start_date and catchup are parameters that control the depth of the download history. Airflow uses an interval approach. This means that the time period from start_date to the optional end_date (we are not using it) is split into the intervals specified in schedule_interval. If catchup is True, then DAGA will start from start_date, if False, then from the current interval;

  • schema_name – database schema where the storefront is located;

  • task_list – list of tasks in the DAG.

The main parameters of the tasks:

  • task_name – matches the task_id Airflow

  • task_type – task type

  • task_schema_name – database schema in which the showcase is located, if the schema differs from the general one

  • task_conn_id – connection string, if different from the general one

  • procedure_name – showcase loading procedure

  • params – list of procedure parameters and their values

  • task_depends_on – the list of tasks on which the launch of this task depends

  • priority_weight – the priority of this task in relation to other tasks

  • task_concurrency – the number of simultaneously running task instances in all running DAG instances

There are currently three types of tasks (task_type):

1) Dummy – corresponds to DummyOperator. A task that does nothing and usually serves as a start and end task, as well as for dividing tasks into blocks.

2) Normal upload – corresponds to PostgresOperator in Airflow

This is how the SQL code that this task generates looks like:

3) Multiple loading – many PostgresOperator (if you need to create a bunch of similar tasks that differ in one parameter)

This type has its own specific parameters:

  • task_multiply – can have 2 values: “schema” or “params”. If schema is specified, the values ​​from task_multiply_list are added to the SEARCH_PATH expression. If “params”, then the values ​​from task_multiply_list are added to the list of procedure parameters for the parameter from the params list with the value specified as ‘task_multiply_list’

  • task_multiply_list – a list of values ​​for a parameter by which tasks of the same type will be created

The result is this SQL code.

For “schema”:

For “params”:

And this is how the dependencies between the tasks are put down:

Where are we going

The implementation of the tool has significantly reduced the time for the development of DAGs. You no longer need to dive deeply into Apache Airflow, although you will still have to read about macros and scheduling. The configuration file template is completed in 10-15 minutes. The time spent on reviewing and deploying for production has also been greatly reduced. However, this is also where the main area for development lies: now the review and deployment are taking place in manual mode. I would like to impose all this with tests and provide the developer with the opportunity to send their DAGs to production themselves.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *