Everything You Need to Know About Airflow DAGs Part 3.1 – Template Creation
Previous: Part 1 Fundamentals and Schedules, Part 2 Operators and Sensors Part 3 DAG Design
Review
Templating is a powerful concept in Airflow for passing dynamic information to task instances at run time. For example, let’s say you want to print the day of the week to the terminal each time you run a task:
BashOperator(
task_id="print_day_of_week",
bash_command="echo Today is {{ execution_date.format('dddd') }}",
)
In this example, the value in double curly braces {{ }}
– this is our templated (i.e. template) code that will be evaluated at runtime. If we run this code on a Wednesday, BashOperator will print “Today is Wednesday”. Creating templates is important in many cases. For example, we can use templates to create a new directory named after the task due date to store daily data (for example, /data/path/20210824), or select a specific partition (for example, /data/path/yyyy=2021/mm= 08/dd =24) to read the corresponding data for each given date.
Airflow uses as template engine Jinjais a Python templating framework. In this tutorial, we’ll cover how to apply Jinja templating in your code, including:
What variables and functions are available when creating templates
Which operator fields can be templated and which cannot
How to check (validate) templates
How to use custom variables and functions when creating templates
How to Convert Patterns to Strings and to Python Code
Runtime variables in Airflow
Templating in Airflow works exactly like templating with Jinja in Python: place your code to be evaluated between double curly braces and the expression will be evaluated at runtime. As we saw in the previous code snippet, execution_date
is a variable available at runtime.
Airflow includes many variables that can be used to create templates. Here are some of the most commonly used:
execution_date
– Date and time (datetime
) start of DAG start intervalds
–execution_date
in the format “2021-08-27”ds_nodash
–execution_date
formatted as “20210827”next_ds
–execution_date
next execution (= end of current interval) date and timedatetime
For a complete list of all available variables, see the Apache Air flow documentation.
Template fields and scripts
Templates cannot be applied to all operator arguments. The two attributes in BaseOperator
define restrictions on the creation of templates:
template_fields
: Specifies which fields are templatedtemplate_ext
: Specifies which file extensions can be templated
Let’s look at a simple launch option BashOperator
::
class BashOperator(BaseOperator):
template_fields = ('bash_command', 'env') # defines which fields are templateable
template_ext = ('.sh', '.bash') # defines which file extensions are templateable
def __init__(
self,
*,
bash_command,
env: None,
output_encoding: 'utf-8',
**kwargs,
):
super().__init__(**kwargs)
self.bash_command = bash_command # templateable (can also give path to .sh or .bash script)
self.env = env # templateable
self.output_encoding = output_encoding # not templateable
template_fields
contains a list of attributes that can be templated. You can also find this list at Airflow Documentation or in the Airflow UI if you have a task running, under Instance Details —>
template_fields
::
template_ext
contains a list of file extensions that can be read and templated at runtime. For example, instead of providing a Bash command to bash_command
could you provide a script .sh
::
run_this = BashOperator(
task_id="run_this",
bash_command="script.sh", # .sh extension can be read and templated
)
This task reads the content script.sh
creates its template and executes it:
# script.sh
echo "Today is {{ execution_date.format('dddd') }}"
Creating templates from files simplifies development (especially as your scripts get bigger) because the IDE can apply language-specific syntax highlighting to the script. This wouldn’t be possible if your script was defined as a big line in the Airflow code.
Airflow searches by default script.sh
relative to the directory where the DAG file is defined. If your DAG is stored in /path/to/dag.py
and your script is in /path/to/scripts/script.sh
you need to set the value bash_command
equal scripts/script.sh
.
Optionally, additional “search paths” can be controlled at the DAG level with the argument template_searchpath
:
with DAG(..., template_searchpath="/tmp") as dag:
run_this = BashOperator(task_id="run_this", bash_command="script.sh")
The code above defines that you can save your bash script in /tmp
and Airflow will find it.
Checking (validation) of templates
Template output can be checked in both the Airflow UI and the CLI. One of the benefits of the CLI is that you don’t have to run any tasks before seeing the result.
Airflow CLI command airflow tasks render
displays all template attributes of the given task. receiving dag_id
, task_id
and conditional execution_date
,, the command outputs something like this:
$ airflow tasks render example_dag run_this 2021-01-01
# ----------------------------------------------------------
# property: bash_command
# ----------------------------------------------------------
echo "Today is Friday"
# ----------------------------------------------------------
# property: env
# ----------------------------------------------------------
None
For this command to work, Airflow must have access to a special store (metastore). To achieve this goal, you can quickly set up local storage based on SQLite:
cd [your project dir]
export AIRFLOW_HOME=$(pwd)
airflow db init # generates airflow.db, airflow.cfg, and webserver_config.py in your project dir
# airflow tasks render [dag_id] [task_id] [execution_date]
For most templates, this will be sufficient. However, if any external systems are available in your templating logic (for example, a variable in your repository), you must have a connection to those systems.
In the Air flow UI, you can view the result of the template attributes after the task is completed. Click on the “” button in the task instanceRendered” to see the result:
Clicking this button displays the template view and template attribute output:
Using Custom Functions and Variables in Templates
As shown above, we have several variables (for example, execution_date
and ds
) available during template creation. For various reasons, such as security, the Jinja runtime is different from the Airflow runtime. You can think of the Jinja environment as a very stripped-down Python environment. This means, among other things, that modules cannot be imported. For example, this code doesn’t work in Jinja template:
from datetime import datetime
BashOperator(
task_id="print_now",
bash_command="echo It is currently {{ datetime.now() }}", # raises jinja2.exceptions.UndefinedError: 'datetime' is undefined
)
However, it is possible to inject features into your Jinja environment. Airflow has several standard Python modules called “macros” for creating templates. For example, the above erroneous code can be fixed with macros.datetime
::
BashOperator(
task_id="print_now",
bash_command="echo It is currently {{ macros.datetime.now() }}", # It is currently 2021-08-30 13:51:55.820299
)
For a complete list of all available macros, refer to Apache Airflow Documentation.
In addition to the provided functions, you can also use self-defined variables and functions in your templates. Airflow provides a convenient way to introduce them to the Jinja framework. Let’s say in our DAG we want to display the number of days since May 1, 2015, and we wrote a convenient function for this:
def days_to_now(starting_date):
return (datetime.now() - starting_date).days
To use this inside a Jinja template, you can pass a dictionary to user_defined_macros
in DAG:
def days_to_now(starting_date):
return (datetime.now() - starting_date).days
with DAG(
dag_id="demo_template",
start_date=datetime(2021, 1, 1),
schedule_interval=None,
user_defined_macros={
"starting_date": datetime(2015, 5, 1), # Macro can be a variable
"days_to_now": days_to_now, # Macro can also be a function
},
) as dag:
print_days = BashOperator(
task_id="print_days",
bash_command="echo Days since {{ starting_date }} is {{ days_to_now(starting_date) }}", # Call user defined macros
)
# Days since 2015-05-01 00:00:00 is 2313
You can also inject functions as filters jinja using user_defined_filters
. You can use filters as pipeline operations. The following example does the same job as the previous example, only this time using filters:
with DAG(
dag_id="bash_script_template",
start_date=datetime(2021, 1, 1),
schedule_interval=None,
user_defined_filters={"days_to_now": days_to_now}, # Set user_defined_filters to use function as pipe-operation
user_defined_macros={"starting_date": datetime(2015, 5, 1)},
) as dag:
print_days = BashOperator(
task_id="print_days",
bash_command="echo Days since {{ starting_date }} is {{ starting_date | days_to_now }}", # Pipe value to function
)
# Days since 2015-05-01 00:00:00 is 2313
Functions implemented with user_defined_filters
and user_defined_macros
available for use in the Jinja environment. Although they give the same result, we recommend using filters when you need to import multiple custom functions because the filter format improves the readability of your code. You can verify this by comparing the two methods:
"{{ name | striptags | title }}" # chained filters are read naturally from left to right
"{{ title(striptags(name)) }}" # multiple functions are more difficult to interpret because reading right to left
Rendering native Python code
By default, Jinja templates are always rendered as Python strings. This is fine in almost all situations in Airflow, but sometimes it is desirable to display templates in native Python code. If the code you’re calling doesn’t work with strings, you’re in trouble. Let’s look at an example:
def sum_numbers(*args):
total = 0
for val in args:
total += val
return total
sum_numbers(1, 2, 3) # returns 6
sum_numbers("1", "2", "3") # TypeError: unsupported operand type(s) for +=: 'int' and 'str'
Consider a scenario where you pass a list of values to this function by calling a DAG with a configuration containing some numbers:
with DAG(dag_id="failing_template", start_date=datetime.datetime(2021, 1, 1), schedule_interval=None) as dag:
sumnumbers = PythonOperator(
task_id="sumnumbers",
python_callable=sum_numbers,
op_args="{{ dag_run.conf['numbers'] }}",
)
We want to launch the DAG with the following JSON for the DAG launch configuration:
{"numbers": [1,2,3]}
The displayed value will be a string. Since the function sum_numbers
unpacks the given string, it ends up trying to add up each character in the string:
('[', '1', ',', ' ', '2', ',', ' ', '3', ']')
This won’t work, so we have to tell Jinja to return a proper Python list instead of a string. Jinja supports this with an environment. Default Jinja Environment outputs strings, but we can customize native environmentwhich renders templates as native Python code.
Jinja NativeEnvironment support has been added in Airflow 2.1.0 using an argument render_template_as_native_obj
in the DAG class. This argument takes a boolean value that determines whether templates should be rendered in the default Jinja environment or NativeEnvironment. For example:
def sum_numbers(*args):
total = 0
for val in args:
total += val
return total
with DAG(
dag_id="native_templating",
start_date=datetime.datetime(2021, 1, 1),
schedule_interval=None,
render_template_as_native_obj=True, # Render templates using Jinja NativeEnvironment
) as dag:
sumnumbers = PythonOperator(
task_id="sumnumbers",
python_callable=sum_numbers,
op_args="{{ dag_run.conf['numbers'] }}",
)
Passing the same configuration JSON {“numbers”: [1,2,3]} now displays a list of integers that the sum_numbers function handles correctly:
[2021-08-26 11:53:12,872] {python.py:151} INFO - Done. Returned value was: 6
In conclusion, we note that the Jinja environment must be configured at the DAG level. This means that all tasks in the DAG are rendered either using the default Jinja environment or using the NativeEnvironment.
In conclusion
Finally, you can use templates to give your tasks specific behavior by evaluating template expressions at run time. There are a few things to keep in mind when creating templates in Airflow:
Airflow provides some handy settings for Jinja, Python’s templating engine
There are several variables available at runtime that can be used when creating templates (see below). documentation)
template_fields
andtemplate_ext
define what are the templates for the statementSeveral Python libraries are embedded in the Jinja environment (see below). documentation).
Custom variables and functions can be entered with
user_defined_filters
anduser_defined_macros
By default, the Jinja template returns strings, but native Python code can be returned by setting
render_template_as_native_obj=True
in your DAGFor information on all Jinja templating features, refer to Jinja documentation