Everything You Need to Know About Airflow DAGs Part 3.1 – Template Creation

Previous: Part 1 Fundamentals and Schedules, Part 2 Operators and Sensors Part 3 DAG Design

Original article

Review

Templating is a powerful concept in Airflow for passing dynamic information to task instances at run time. For example, let’s say you want to print the day of the week to the terminal each time you run a task:

BashOperator(
    task_id="print_day_of_week",
    bash_command="echo Today is {{ execution_date.format('dddd') }}",
)

In this example, the value in double curly braces {{ }} – this is our templated (i.e. template) code that will be evaluated at runtime. If we run this code on a Wednesday, BashOperator will print “Today is Wednesday”. Creating templates is important in many cases. For example, we can use templates to create a new directory named after the task due date to store daily data (for example, /data/path/20210824), or select a specific partition (for example, /data/path/yyyy=2021/mm= 08/dd =24) to read the corresponding data for each given date.

Airflow uses as template engine Jinjais a Python templating framework. In this tutorial, we’ll cover how to apply Jinja templating in your code, including:

  • What variables and functions are available when creating templates

  • Which operator fields can be templated and which cannot

  • How to check (validate) templates

  • How to use custom variables and functions when creating templates

  • How to Convert Patterns to Strings and to Python Code

Runtime variables in Airflow

Templating in Airflow works exactly like templating with Jinja in Python: place your code to be evaluated between double curly braces and the expression will be evaluated at runtime. As we saw in the previous code snippet, execution_date is a variable available at runtime.

Airflow includes many variables that can be used to create templates. Here are some of the most commonly used:

  • execution_date – Date and time (datetime) start of DAG start interval

  • dsexecution_date in the format “2021-08-27”

  • ds_nodashexecution_dateformatted as “20210827”

  • next_dsexecution_date next execution (= end of current interval) date and time datetime

For a complete list of all available variables, see the Apache Air flow documentation.

Template fields and scripts

Templates cannot be applied to all operator arguments. The two attributes in BaseOperator define restrictions on the creation of templates:

  • template_fields: Specifies which fields are templated

  • template_ext: Specifies which file extensions can be templated

Let’s look at a simple launch option BashOperator::

class BashOperator(BaseOperator):
    template_fields = ('bash_command', 'env')  # defines which fields are templateable
    template_ext = ('.sh', '.bash')  # defines which file extensions are templateable

    def __init__(
        self,
        *,
        bash_command,
        env: None,
        output_encoding: 'utf-8',
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.bash_command = bash_command  # templateable (can also give path to .sh or .bash script)
        self.env = env  # templateable
        self.output_encoding = output_encoding  # not templateable

template_fields contains a list of attributes that can be templated. You can also find this list at Airflow Documentation or in the Airflow UI if you have a task running, under Instance Details —> template_fields::

template_ext contains a list of file extensions that can be read and templated at runtime. For example, instead of providing a Bash command to bash_command could you provide a script .sh::

run_this = BashOperator(
    task_id="run_this",
    bash_command="script.sh",  # .sh extension can be read and templated
)

This task reads the content script.sh creates its template and executes it:

# script.sh
echo "Today is {{ execution_date.format('dddd') }}"

Creating templates from files simplifies development (especially as your scripts get bigger) because the IDE can apply language-specific syntax highlighting to the script. This wouldn’t be possible if your script was defined as a big line in the Airflow code.

Airflow searches by default script.sh relative to the directory where the DAG file is defined. If your DAG is stored in /path/to/dag.py and your script is in /path/to/scripts/script.sh you need to set the value bash_command equal scripts/script.sh .

Optionally, additional “search paths” can be controlled at the DAG level with the argument template_searchpath:

with DAG(..., template_searchpath="/tmp") as dag:
    run_this = BashOperator(task_id="run_this", bash_command="script.sh")

The code above defines that you can save your bash script in /tmpand Airflow will find it.

Checking (validation) of templates

Template output can be checked in both the Airflow UI and the CLI. One of the benefits of the CLI is that you don’t have to run any tasks before seeing the result.

Airflow CLI command airflow tasks render displays all template attributes of the given task. receiving dag_id, task_id and conditional execution_date,, the command outputs something like this:

$ airflow tasks render example_dag run_this 2021-01-01

# ----------------------------------------------------------
# property: bash_command
# ----------------------------------------------------------
echo "Today is Friday"

# ----------------------------------------------------------
# property: env
# ----------------------------------------------------------
None

For this command to work, Airflow must have access to a special store (metastore). To achieve this goal, you can quickly set up local storage based on SQLite:

cd [your project dir]
export AIRFLOW_HOME=$(pwd)
airflow db init  # generates airflow.db, airflow.cfg, and webserver_config.py in your project dir

# airflow tasks render [dag_id] [task_id] [execution_date]

For most templates, this will be sufficient. However, if any external systems are available in your templating logic (for example, a variable in your repository), you must have a connection to those systems.

In the Air flow UI, you can view the result of the template attributes after the task is completed. Click on the “” button in the task instanceRendered” to see the result:

Clicking this button displays the template view and template attribute output:

Using Custom Functions and Variables in Templates

As shown above, we have several variables (for example, execution_date and ds) available during template creation. For various reasons, such as security, the Jinja runtime is different from the Airflow runtime. You can think of the Jinja environment as a very stripped-down Python environment. This means, among other things, that modules cannot be imported. For example, this code doesn’t work in Jinja template:

from datetime import datetime

BashOperator(
    task_id="print_now",
    bash_command="echo It is currently {{ datetime.now() }}",  # raises jinja2.exceptions.UndefinedError: 'datetime' is undefined
)

However, it is possible to inject features into your Jinja environment. Airflow has several standard Python modules called “macros” for creating templates. For example, the above erroneous code can be fixed with macros.datetime::

BashOperator(
    task_id="print_now",
    bash_command="echo It is currently {{ macros.datetime.now() }}",  # It is currently 2021-08-30 13:51:55.820299
)

For a complete list of all available macros, refer to Apache Airflow Documentation.

In addition to the provided functions, you can also use self-defined variables and functions in your templates. Airflow provides a convenient way to introduce them to the Jinja framework. Let’s say in our DAG we want to display the number of days since May 1, 2015, and we wrote a convenient function for this:

def days_to_now(starting_date):
    return (datetime.now() - starting_date).days

To use this inside a Jinja template, you can pass a dictionary to user_defined_macros in DAG:

def days_to_now(starting_date):
    return (datetime.now() - starting_date).days


with DAG(
    dag_id="demo_template",
    start_date=datetime(2021, 1, 1),
    schedule_interval=None,
    user_defined_macros={
        "starting_date": datetime(2015, 5, 1),  # Macro can be a variable
        "days_to_now": days_to_now,  # Macro can also be a function
    },
) as dag:
    print_days = BashOperator(
        task_id="print_days",
        bash_command="echo Days since {{ starting_date }} is {{ days_to_now(starting_date) }}",  # Call user defined macros
    )
    # Days since 2015-05-01 00:00:00 is 2313

You can also inject functions as filters jinja using user_defined_filters. You can use filters as pipeline operations. The following example does the same job as the previous example, only this time using filters:

with DAG(
    dag_id="bash_script_template",
    start_date=datetime(2021, 1, 1),
    schedule_interval=None,
    user_defined_filters={"days_to_now": days_to_now},  # Set user_defined_filters to use function as pipe-operation
    user_defined_macros={"starting_date": datetime(2015, 5, 1)},
) as dag:
    print_days = BashOperator(
        task_id="print_days",
        bash_command="echo Days since {{ starting_date }} is {{ starting_date | days_to_now }}",  # Pipe value to function
    )
    # Days since 2015-05-01 00:00:00 is 2313

Functions implemented with user_defined_filters and user_defined_macros available for use in the Jinja environment. Although they give the same result, we recommend using filters when you need to import multiple custom functions because the filter format improves the readability of your code. You can verify this by comparing the two methods:

"{{ name | striptags | title }}"  # chained filters are read naturally from left to right
"{{ title(striptags(name)) }}"  # multiple functions are more difficult to interpret because reading right to left

Rendering native Python code

By default, Jinja templates are always rendered as Python strings. This is fine in almost all situations in Airflow, but sometimes it is desirable to display templates in native Python code. If the code you’re calling doesn’t work with strings, you’re in trouble. Let’s look at an example:

def sum_numbers(*args):
    total = 0
    for val in args:
        total += val
    return total

sum_numbers(1, 2, 3)  # returns 6
sum_numbers("1", "2", "3")  # TypeError: unsupported operand type(s) for +=: 'int' and 'str'

Consider a scenario where you pass a list of values ​​to this function by calling a DAG with a configuration containing some numbers:

with DAG(dag_id="failing_template", start_date=datetime.datetime(2021, 1, 1), schedule_interval=None) as dag:
    sumnumbers = PythonOperator(
        task_id="sumnumbers",
        python_callable=sum_numbers,
        op_args="{{ dag_run.conf['numbers'] }}",
    )

We want to launch the DAG with the following JSON for the DAG launch configuration:

{"numbers": [1,2,3]}

The displayed value will be a string. Since the function sum_numbers unpacks the given string, it ends up trying to add up each character in the string:

('[', '1', ',', ' ', '2', ',', ' ', '3', ']')

This won’t work, so we have to tell Jinja to return a proper Python list instead of a string. Jinja supports this with an environment. Default Jinja Environment outputs strings, but we can customize native environmentwhich renders templates as native Python code.

Jinja NativeEnvironment support has been added in Airflow 2.1.0 using an argument render_template_as_native_obj in the DAG class. This argument takes a boolean value that determines whether templates should be rendered in the default Jinja environment or NativeEnvironment. For example:

def sum_numbers(*args):
    total = 0
    for val in args:
        total += val
    return total


with DAG(
    dag_id="native_templating",
    start_date=datetime.datetime(2021, 1, 1),
    schedule_interval=None,
    render_template_as_native_obj=True,  # Render templates using Jinja NativeEnvironment
) as dag:
    sumnumbers = PythonOperator(
        task_id="sumnumbers",
        python_callable=sum_numbers,
        op_args="{{ dag_run.conf['numbers'] }}",
    )

Passing the same configuration JSON {“numbers”: [1,2,3]} now displays a list of integers that the sum_numbers function handles correctly:

[2021-08-26 11:53:12,872] {python.py:151} INFO - Done. Returned value was: 6

In conclusion, we note that the Jinja environment must be configured at the DAG level. This means that all tasks in the DAG are rendered either using the default Jinja environment or using the NativeEnvironment.

In conclusion

Finally, you can use templates to give your tasks specific behavior by evaluating template expressions at run time. There are a few things to keep in mind when creating templates in Airflow:

Airflow provides some handy settings for Jinja, Python’s templating engine
There are several variables available at runtime that can be used when creating templates (see below). documentation)

  • template_fields and template_ext define what are the templates for the statement

  • Several Python libraries are embedded in the Jinja environment (see below). documentation).

  • Custom variables and functions can be entered with user_defined_filters and user_defined_macros

  • By default, the Jinja template returns strings, but native Python code can be returned by setting render_template_as_native_obj=True in your DAG

  • For information on all Jinja templating features, refer to Jinja documentation

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *