Everything You Need to Know About Airflow DAGs Part 3 – Designing a DAG

Previous: Part 1 Fundamentals and Schedules, Part 2 Operators and Sensors

3. DAG Design

Since Airflow is 100% code, knowing the basics of Python is all it takes to get started writing DAGs. However, writing efficient, secure, and scalable DAGs requires some Airflow-specific considerations. In this section, we’ll look at some DAG development best practices that make the most of Airflow’s capabilities.

In general, most of the best practices we review here fall into one of two categories:

Checking for idempotency

Before we get into Airflow-specific recommendations, we need to look at one concept that applies to all data pipelines.

Idempotence is the basis for many calculation methods, including the Airflow recommendations in this section. This is a special quality: a computational operation is considered idempotent if it always produces the same result.

In the context of Airflow, a DAG is considered idempotent if each run of the DAG generates the same results, even when run multiple times. Designing idempotent DAGs reduces failover time and prevents data loss.

DAG Design

The following DAG design principles will help make your DAGs usable, efficient, and readable.

Adhere to the rule of atomicity of each task

When breaking down your pipeline into separate tasks, ideally each task should be atomic. This means that each task must be responsible for one operation, which can be re-run independently of the others. In other words, in an automated task, success in part of the task means success in the entire task.

For example, in an ETL pipeline, you would ideally want your Extract, Transform, and Load operations to be covered by three separate tasks. Atomization, the separation of these tasks, allows you to re-run each operation in the pipeline independently, which maintains idempotency.

Use template fields, variables and macros

With template fields in Airflow, you can retrieve values ​​in a DAG using environment variables and jinja templates. Compared to using Python functions, using template fields helps keep your DAGs idempotent and ensures that you don’t execute functions on every scheduler event (for more on scheduler optimizations, see “Avoid top-level code in your DAG” below).

Contrary to our best practices, the following example defines variables based on datetime-Python functions:

# Variables used by tasks
# Bad example - Define today’s and yesterday’s date using date-time module
today = datetime.today()
yesterday = datetime.today() - timedelta(1)

If this code is in a DAG file, these functions will be executed on every scheduler event, which can degrade performance. Moreover, it does not lead to an idempotent DAG: if you had to re-run a previously failed DAG run for a past date, you would not be able to do so, because date-time.today() refers to the current date, not the due date of the DAG.

The best way to implement this is to use the Airflow variable:

# Variables used by tasks
# Good example - Define yesterday’s date with an Airflow variable
yesterday = {{ yesterday_ds_nodash }}

You can use one of the many built-in variables and macros Airflow or create your own template field to pass information at runtime. For more information on this topic, check out our guide to creating a guide to creating templates and macros in Airflow.

Incremental Record Filtering

It’s ideal to break your pipelines into incremental fetches and downloads wherever possible. For example, if you have a DAG that runs hourly, each run of the DAG should only process records for that hour, not the entire dataset. When the results of each DAG run are only a small subset of your total data set, a failure in one subset of data will not prevent the rest of your DAG from running and completing successfully. And if your DAGs are idempotent, you can only rerun the DAG on the data that failed instead of reprocessing the entire dataset.

There are several ways to create incremental pipelines. The two best and most common methods are described below.

Using a “last modified” date is the gold standard for incremental downloads. Ideally, each entry in your source system should have a column containing the time the entry was last modified. With this design, the DAG run looks for records that were updated during certain dates from that column.

For example, if the DAG runs hourly, each DAG run will be responsible for downloading any records that fall between the start and end of its hour. If any of these launches fail, the other launches will not be affected.

If a last modified date is not available, sequence or increment IDs can be used for incremental loads. This logic works best when the original records are only added and never updated. While we recommend using a “last modified” date system in your records whenever possible, basing your incremental logic on a sequence ID can be a reasonable way to filter pipeline records without a last modified date.

In the context of Airflow, we use “top-level code” to refer to any code that is not part of your DAGs or operator instances.

Airflow executes all code in a folder dags_folder at every min_file_process_interval , which defaults to 30 seconds (read more about this setting in the Airflow documentation). Because of this, top-level code that makes requests to external systems such as an API or database, or calls to functions outside of your tasks, can cause performance issues. In addition, including code in a DAG file that is not part of your DAG or operator instances makes it difficult to read, maintain, and update the DAG.

Treat your DAG file like a config file and leave all the hard work to the hooks and statements you create in the file. If your DAG needs access to additional code, such as an SQL script or a Python function, save that code in a separate file that can be read when the DAG starts up.

As an example of what not to do, in the DAG below PostgresOperator executes a SQL query that has been dumped directly into a DAG file:

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta

#Default settings applied to all tasks
default_args = {
	'owner': 'airflow',
	'depends_on_past': False,
	'email_on_failure': False,
	'email_on_retry': False,
	'retries': 1,
	'retry_delay': timedelta(minutes=1)
}

#Instantiate DAG
with DAG('bad_practices_dag_1',
        start_date=datetime(2021, 1, 1),
				max_active_runs=3,
				schedule_interval="@daily",
				default_args=default_args,
				catchup=False
) as dag:

	t0 = DummyOperator(task_id='start')

  #Bad example with top level SQL code in the DAG file
  query_1 = PostgresOperator(
		task_id='covid_query_wa',
		postgres_conn_id='postgres_default',
		sql=""'with yesterday_covid_data as (
						SELECT * FROM covid_state_data
						WHERE date = {{ params.today }}
							AND state = ‘WA’
						),
	two_day_rolling_avg as (
		SELECT AVG(a.state, b.state) as two_day_avg
		FROM yesterday_covid_data a
		JOIN yesterday_covid_data b
			ON a.state = b.state
			)
					SELECT a.state, b.state, c.two_day_avg
					FROM yesterday_covid_data a
					JOIN today_covid_data b
						ON a.state=b.state
					JOIN two_day_rolling_avg c
						ON a.state=b.two_day_avg;''',
				params={‘today’: today, ‘yesterday’:yesterday}
)

Storing the request in a DAG file makes the DAG difficult to read and maintain. Instead, in the DAG below, we call a file named covid_state_query.sql in our Postgres statement instance, which is the best way:

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta

#Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}

#Instantiate DAG
with DAG('good_practices_dag_1',
	start_date=datetime(2021, 1, 1),
	max_active_runs=3,
	schedule_interval="@daily",
	default_args=default_args,
	catchup=False,
	template_searchpath="/usr/local/airflow/include" #include path to look for external files
			) as dag:
  
	query = PostgresOperator(
	task_id='covid_query_{0}'.format(state),
	postgres_conn_id='postgres_default',
	sql="covid_state_query.sql", #reference query kept in separate file
	params={'state': "'" + state + "'"}
		)

Use a Specific Method for Task Dependencies

In Airflow, task dependencies can be set in several ways. You can use functionsset_upstream() and set_downstream(), or you can use the << and >> operators. Which method you use depends on personal preference, but for readability, it’s best to choose one method and stick with it.

For example, instead of mixing methods like this:

task_1.set_downstream(task_2)
task_3.set_upstream(task_2)
task_3 >> task_4

Try to be consistent – like this:

task_1 >> task_2 >> [task_3, task_4]

Take advantage of Airflow

The next category of recommendations pertains to using Airflow for what it was originally designed for: an orchestrator. Using Airflow as an orchestrator makes it easy to scale and use the right tools for your needs.

Use vendor packages

One of the main factors in favor of using Airflow is its robust and active community which has helped integrate Airflow with other tools known as vendor packages.

Vendor packages allow you to organize third party data processing jobs directly from Airflow. Wherever possible, it’s best to use these integrations rather than writing Python functions yourself (no need to reinvent the wheel). This makes Airflow easier to implement for teams using existing tools – you have to write less code.
To find all available provider packages, have a look at Registry.

Decide where to perform Data Processing Jobs

Since DAGs are written in Python, you have many options for implementing data processing. For small to medium sized workloads, it’s usually safe to run data processing within Airflow as long as you allocate enough resources to your Airflow infrastructure. Large data processing jobs are generally best offloaded to an environment specifically optimized for these use cases, such as Apache Spark. You can then use Airflow to organize those jobs.

We recommend that you consider the size of your data now and in the future when deciding whether to process your data in Airflow or upload it to an external tool. If your use case is well suited for data processing within Airflow, then we would recommend the following:

  • Use the XCom server UI if you need to pass any data between tasks without overloading the metadata database.

Use an Intermediate Data Store

Because it requires less code and fewer snippets, it can be tempting to write your own DAGs to move data directly from source to destination. However, this means that you cannot individually re-run the extract or load portions of the pipeline. By placing an intermediate storage tier, such as S3 or SQL staging tables, between the source and destination, you can decouple testing and re-running fetch and load.

Depending on your data retention policy, you can change the load logic and re-run the entire pipeline without having to re-run the pulls. It is also useful in situations where you no longer have access to the source system (for example, you have reached your API call limit).

Use the ELT framework

Whenever possible, try to implement the ELT (Extract, Load, Transform) data pipeline pattern with your DAGs. This means that you should strive to load as much transformation logic as possible into source or destination systems, which allows you to leverage the strengths of all the tools in your data ecosystem. Many modern data warehouse tools such as snowflakeprovide easy access to calculations to support the ELT platform and are easily used in conjunction with Airflow.

More Best Practices

Finally, here are a few other noteworthy best practices that don’t fall into the two categories above.

Use a Consistent File Structure

Having a consistent file structure for Airflow projects keeps things organized and easier to implement. At Astronomer we use:

├── dags/ # Where your DAGs go
│	├── example-dag.py # An example dag that comes with the initialized project
├── Dockerfile # For Astronomer’s Docker image and runtime overrides
├── include/ # For any other files you’d like to include
├── plugins/ # For any custom or community Airflow plugins
├── packages.txt # For OS-level packages
└── requirements.txt # For any Python packages

Use the DAG Name and Start Date Correctly

You should always use static start_date with your DAGs. The dynamic value of start_date is misleading and can lead to failures when cleaning up failed task instances and missed restarts.

Also, if you change start_date your DAG, you should also change the name of the DAG. Change start_date The DAG creates a new entry in the Airflow database, which can confuse the scheduler because there will be two DAGs with the same name but different schedules.

Changing the name of the DAG also creates a new entry in the database that triggers the dashboard, so follow the agreed naming convention because changing the name of the DAG does not delete the entry in the database with the old name.

Set retries at the DAG level

Even if your code is perfect, crashes happen. In a distributed environment where task containers run on shared hosts, tasks can end unexpectedly. When this happens, you can see that the Airflow logs mention zombie process.

Problems like this can be solved by setting task retries. Best practice is to set retries as default_argso that they are applied at the DAG level and are more specific for specific tasks only where necessary. It is recommended to set 2-4 retries.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *