Designing Data Pipelines in Apache Airflow

This article will cover a brief introduction to Airflow and the steps to create and configure Data Pipelines. First, we will install and configure Airflow. Then, let’s look at a practical example of creating and running a DAG in Airflow. Our goal today is a practical understanding of Airflow deployment and basic DAG development.

A little about DataOps

The DataOps methodology is designed to enable an organization to use an iterative process to build and deploy analytics and data pipelines. By following data and model management practices, they can provide high-quality enterprise data for their AI applications.

In other words, the practice of DataOps allows you to transfer the experience of DevOps to data management and analytics. Experience shows that deploying DataOps effectively speeds time to market for analytics solutions, improves data quality and compliance, and reduces data management costs.

A little about AirFlow

In this step, we will install the Apache Airflow Python package in our environment and initialize the configuration.

Install the Airflow package by running the following command in the terminal:

pip install "apache-airflow==2.3.0" --ignore-installed

Note that we are pinning a specific version of Airflow and using the flag here --ignore-installedto avoid some version conflicts with package dependencies.

Airflow database initialization

Airflow uses a relational database as the back end to store configuration data. By default this is a SQLite database which will be stored in ~/airflow/airflow.db. We initialize the database in our environment by running the following command in the terminal:

airflow db init

Create an admin user

Next, we need to create a user that can log into the Airflow UI. Enter the following in the terminal to create a user named admin with administrator rights:

airflow users create \

    --username admin \

    --firstname Firstname \

    --lastname Lastname \

    --role Admin \

    --email admin@example.org \

    --password password

If successful, we will see the following output:

Starting the Web Server and Scheduler

To make sure the configuration is working correctly, we can start the Airflow web server and scheduler and log into the user interface. Run the following commands in a terminal to start the web server and scheduler:

airflow webserver --port 8080 -D
airflow scheduler
airflow scheduler

This starts the Airflow web interface on port 8080

Login to Airflow
Login to Airflow

After loading the user interface, we will see the login page. Enter the user credentials created in the previous step:

Username: admin

Password: password

If everything is configured successfully, then we will see the Airflow web interface with a list of DAG examples:

Now we can return to our terminal and exit the scheduler process (optional, but I recommend doing this). Enter: ctrl+C

Open a new terminal and run the command: airflow scheduler

Configuring AirFlow
Configuring AirFlow

Let’s set up AirFlow using some best practices.

Let’s list the DAG first, go back to our first terminal and run the command: airflow dags list

Now let’s edit the configuration file, which should be in /root/airflow/airflow.cfg. This file contains information about all the settings of our Airflow.

These settings are represented as key/value pairs, which look like
setting_name = setting_value

Change the location of the DAG folder

We are going to use a new folder for our Python DAG code. First we must update the DAG folder setting to the new path. For our purposes, we will use the path /root/airflow_demo/dags.

Change the dags_folder setting on line 4 to look like this:

This points Airflow to the project directory where new DAG entities will be stored and created.

Disable DAG Examples

There are many examples of DAGs that are automatically available to us, as you can see in the user interface. This makes it a bit more difficult to see the DAGs we’ve created, so let’s hide these example DAGs.

In addition, we change the parameter load_examples on line 51 so it looks like this:

This prevents Airflow from loading DAG examples.

Change the color of the navigation bar

Our Airflow is currently running as our production environment. Anyone who works in DevOps will tell you that separating production and staging environments is incredibly important.

In general, visual differentiation is best, and luckily Airflow provides the setting for the obvious visual change. Airflow allows us to control the color of the header.

Let’s update the header setting in airflow.cfg so everyone in this environment knows to be especially careful:

navbar_color = #ffc0cb

Force reload Airflow

We’re going to force Airflow to restart so we don’t have to wait for it to restart automatically.

In the terminal, run the following command again:

airflow dags list

Now our DAG list should be empty (because in /root/airflow_demo/dags Nothing yet).

We will need to reinitialize the Airflow database in order for some of these settings to be selected. Let’s execute the following command:

airflow db init

Restart the web server

Finally, we will need to stop/restart the current web server process (you can just kill the pid). To find the pid, run the following command:

cat /root/airflow/airflow-webserver.pid

And we can kill the process with this command:

kill $(cat /root/airflow/airflow-webserver.pid)

Once the Airflow web server has been stopped, we can run the following command to start it up again:

airflow webserver --port 8080 -D

We will also restart the scheduler.

First we will stop the scheduler. Let’s go to the terminal where the scheduler is running and execute the following:

Ctrl+C and airflow scheduler

Let’s test our AirFlow

Let’s create a DAG file along the path /root/airflow_demo/dags With name prove-things-work.py and the following code:

from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

default_args = {

        'owner' : 'Vinoo',

        'depends_on_past' :False,

        'email' :['email@example.com'],

        'email_on_failure': False,

        'email_on_retry': False,

        'catchup': False,

        'retries': 1,

        'retry_delay': timedelta(minutes=5)

        }

dag = DAG(

    'CreateFile',

    default_args=default_args,

    start_date=datetime(2022,1,1,0,0),

    schedule_interval=timedelta(minutes=500))

task1 = BashOperator(

        task_id='prove_things_work',

        bash_command='echo "hello, world!" > /root/create-this-file.txt',

        dag=dag)

Now let’s check the syntax of our DAG, let’s go back to our first terminal and run the command:

python3 airflow_demo/dags/prove-things-work.py

This will check for Python syntax errors in the file. If successful, there will be no output.

As our final step, instead of waiting for the scheduler to pick it up, let’s update the Airflow database so our DAG is initialized by Airflow:

airflow db init

As we can see, AirFlow initialized our DAG

Let’s run it, for this we press the button play -> trigger DAG

In the tree view panel, you will now see square icons next to each task in the DAG, changing color as tasks are queued and executed. If the tasks run successfully, these icons will be marked in dark green in the user interface. If the tasks are not completed, the squares will be marked in red.

Did it work? Let’s make sure the DAG completed successfully. Let’s execute the following command: cat /root/create-this-file.txt

If you see the phrase “Hello world” – your DAG has worked!

Now let’s create a two-node DAG

Let’s create a file named two-node-dag.py with the following code:

from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime

# Default settings applied to all tasks

default_args = {

    'owner': 'airflow',

    'depends_on_past': False,

    'email_on_failure': False,

    'email_on_retry': False,

    'retries': 0,

    'catchup': False,

    'start_date': datetime(2022, 1, 1)

}

with DAG(

    dag_id='two-node-dag',

    description='An example Airflow DAG',

    schedule_interval=None,

    default_args=default_args

    ) as dag:

    t0 = BashOperator(

        task_id='bash_task_0',

        bash_command='echo "Hi there, this is the first Airflow task!"'

    )

    t1 = BashOperator(

        task_id='bash_task_1',

        bash_command='echo "Sleeping..." && sleep 5s && date'

    )

    t0 >> t1

Let’s test the syntax:

python3 airflow_demo/dags/two-node-dag.py

Back to our UI

If the scheduler is running, then AirFlow will pick up a new DAG after some time. However, if you don’t want to wait, you can run the following command to force the sync:

airflow db init

We see that our DAG is visible to our Airflow.

However, this time we will run it from the terminal:

airflow dags trigger two-node-dag

And in the end, let’s look at the logs of our DAG, for this we will return to our UI.

Let’s click on our DAG:

Let’s go to Graph

Choose Task

And in the popup window select log

We have successfully configured and launched several DAGs in Airflow. Let’s summarize:

  • Deploying and configuring AIrflow is easy.

  • Creating a DAG in Airflow is an easy and straightforward process.

  • DAGs are defined by code.

  • DAG entities are quite flexible in use.


Soon OTUS will host an open lesson on the topic “MapReduce: Big Data Processing Algorithm”. On it, we will analyze in detail the universal algorithm with which big data is processed on distributed systems without shared storage (Hadoop, Spark). Let’s talk about bottlenecks and potential operational problems. Let’s see how it looks in practice in Yandex.Cloud. Registration is open to everyone link.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *