Airflow in Kubernetes. Part 1

Greetings!

On the path of a data engineer, there are often tasks related to DevOps. One of these is to deploy Airflow in a Kubernetes cluster. If you have not had similar work experience before, then this task may not seem trivial. Of course, you can run several commands from the official guide, but if you need to fix something, it will be difficult to do without understanding what is happening inside. This article is intended to make this task easier. It will help those who have already worked with Airflow, but have not yet touched Kubernetes technology.

To make it easier to understand this topic, we will use the prepared repository. It is based on officialthrough simplification.

Preliminary work

Airflow, which we will deploy, will have the following architecture (Fig. 1):

Fig.1 Airflow architecture

Fig.1 Airflow architecture

As you can see, it’s a pretty standard scheme. The executor type is Celery, the message broker is Redis, the database is Postgresql.

Before moving on to the aspects of working with Kubernetes, there are a few preliminary things you need to do:

  • Creating a cluster. I recommend using the service to manage a kubernetes cluster in one of the clouds. Personally, I used Yandex Cloud. Here located instructions for its creation.

  • Installation kubectl. A utility that allows you to interact with the cluster.

  • Installation Helm. This tool will be described in more detail below.

  • Creating a database. Here you can also use a ready-made cloud solution – Managed service for Postgresql or pick it up yourself.

Helm

Helm used to install applications in a Kubernetes cluster. This tool provides a wide range of capabilities, but now we are interested in one of them – templating.

The main entity in Helm is chart. Helm chart is a package containing templates and the information needed to complete them. Let’s take a closer look at the contents of our chart (see repository). There we will see:

  • Chart.yaml – some information about our chart

  • templates – directory with all templates that will be deployed to the Kubernetes cluster

  • values.yaml – here we define the variables that we will use in templates

For example, we have an executor value that we want to use. Let’s write it in values.yaml

executor: "CeleryExecutor"

Now, to use this variable in the scheduler template, you need to write the following in the corresponding template file:

{{ .Values.executor }}

Looks like a jinja template in airflow, doesn’t it?

However, sometimes it is inconvenient to simply write all the values ​​in values; you want to somehow pre-process them. To do this, we can use _helpers.yaml, which is located along with the templates in the templates directory. For example, we want to combine the repository and tag values ​​for a docker image:

{{- define "airflow_image" -}}
  {{- $repository := .Values.images.airflow.repository -}}
  {{- $tag := .Values.images.airflow.tag -}}
    {{- printf "%s:%s" $repository $tag -}}
{{- end }}

Now we can call this function in the scheduler template:

{{ template "airflow_image" . }}

The presented repository also uses more complex functions; to understand what they do, you can look at the finished manifest. To get it you need to do

helm template airflow dn-airflow/part1/.

I also put result executing this code into the repository. In this article I will use already completed templates.

Deployment (Scheduler)

So let’s tell Kubernetes what we want it to do. To do this, you need to create manifests – yaml files in which we describe some rules.

Let’s start with the manifest for the scheduler:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow-scheduler
  labels:
    tier: airflow
    component: scheduler
    release: airflow
    chart: "airflow-1"
    executor: CeleryExecutor
spec:
  replicas: 1
  selector:
    matchLabels:
      tier: airflow
      component: scheduler
      release: airflow
  template:
    metadata:
      labels:
        tier: airflow
        component: scheduler
        release: airflow
    spec:
      initContainers:
        - name: wait-for-airflow-migrations
          image: apache/airflow:2.6.2
          imagePullPolicy: IfNotPresent
          volumeMounts:
            - name: config
              mountPath: "/opt/airflow/airflow.cfg"
              subPath: airflow.cfg
              readOnly: true
          args:
            - airflow
            - db
            - check-migrations
            - --migration-wait-timeout= 60
          env:
            - name: AIRFLOW__CORE__FERNET_KEY
              valueFrom:
                secretKeyRef:
                  name: airflow-fernet-key
                  key: fernet-key
            …
            - name: AIRFLOW__CORE__LOAD_EXAMPLES
              value: "True"
      containers:
        - name: scheduler
          image: apache/airflow:2.6.2
          imagePullPolicy: IfNotPresent
          args:
            - bash
            - -c
            - exec airflow scheduler
          env:
            - name: AIRFLOW__CORE__FERNET_KEY
              valueFrom:
                secretKeyRef:
                  name: airflow-fernet-key
                  key: fernet-key
            …
            - name: AIRFLOW__CORE__LOAD_EXAMPLES
              value: "True"
          livenessProbe:
            initialDelaySeconds: 10
            timeoutSeconds: 20
            failureThreshold: 5
            periodSeconds: 60
            exec:
              command:
                - sh
                - -c
                - |
                  CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \
                  airflow jobs check --job-type SchedulerJob --local
          volumeMounts:
            - name: logs
              mountPath: "/opt/airflow/logs"
            - name: config
              mountPath: "/opt/airflow/airflow.cfg"
              subPath: airflow.cfg
              readOnly: true
      volumes:
        - name: config
          configMap:
            name: airflow-airflow-config
        - name: logs
          emptyDir: {}

The first thing that catches our eye is the resource type:
kind: Deployment

To understand what it is Deploymentlet’s start the dive with a simpler entity – Pod. These are multiple containers that run together on the same node, with a shared network and storage. In our case, there are 2 of them: a container with an application and a container with an application that is waiting for migrations (more on this later)

Let us schematically denote Pod thus (Fig. 2):

Fig.2 Schematic representation of Pod

Fig.2 Schematic representation Pod

To run multiple replicas of the same Pod we need a resource ReplicaSet. (Fig. 3), which ensures that the number Pods corresponded to the stated quantity in the manifest.

Rice.  3 Schematic representation of ReplicaSet

Rice. 3 Schematic illustration ReplicaSet

Usually not used separately in production Pod And ReplicaSet. Use their parent object – Deployment (Fig. 4). Because Deployment controls updates Pods. There are 2 renewal strategies. One of them is Rolling Update, which allows you to update Pods one after another, which is more useful in some cases than killing everything first Podsand then launch new ones with the new version of the application.

Fig.4 Schematic representation of Deployment

Fig.4 Schematic representation Deployment

Exactly Deploymentwhich is used for Airflow scheduler, we will analyze in detail:

apiVersion: apps/v1
We set the API version that we will use for our resource. To view supported versions for a cluster, you can use the command:

kubectl api-versions

metadata
Here we set the name Deploymentand also set labels. This is optional information about our Deployment. But thanks to them, we can filter the selection of resources. For example, the command

kubectl get pods -n airflow -l component=scheduler

will return only pods with labels component=scheduler

replicas
This is the number of replicas that will be deployed

selector
Here we describe how ReplicaSet will understand what Pods manage. We use matchLabels where we indicate labels. Exactly the same labels we must indicate V template.

template
Here we describe the pattern Pod. We also ask metadatain which we indicate labels (which uses selector)

spec
The specification of our pod includes information about containers, environment variables, mounted volumes and other information, which we will discuss below:

initContainers
This is a container that will start before the main application starts. We use it to check if migrations have already rolled out. We do this using the command:

airflow db check-migrations

Description initContainers similar to the description Containerswhich we will consider in more detail:

containers

  • image: name of the image we will use

  • imagePullPolicy: image loading policy. In our case, Kubernetes compares digest the specified container and those located on the machine. If they do not match, then it downloads.

  • command and args: specify the command that is executed when the container starts.

  • env: stores environment variables that will be available inside the pod, in addition to the usual ones, with a name and value, here you can add variables whose value will come from Secretanother resource in Kubernetes that we will discuss below.

livenessProbe
Necessary if something goes wrong with the application running in Podthen Kubernetes will understand this and restart the broken one Pod.
It contains additional metrics:

  • initDelaySeconds: how many seconds to wait before starting the first scan. If Pod takes a long time to start, then it’s worth setting a higher value so that Kubernetes doesn’t kill Podbefore the application starts.

  • periodSeconds: how often to run checks.

  • timeoutSeconds: The time Kubernetes waits for a response from the application. If the application does not respond during this time, then the check failed.

  • failureThreshold: number of failed attempts Kubernetes will make before killing Pod.

  • exec: describes the way we check the state Pod. In our case, we launch the shell script.

volumeMounts
determines which path to mount volumes that are described in volumes

volumes
In our case there are 2 of them: config, in which we mount configMap (we will discuss this resource below). And also logs. For it we create a volume like emptyDir. This storage will be available as long as you live Pod. After restart Podthe logs directory will be empty.

StatefulSet(Worker)

To run stateful applications, redis and worker in our case, we use StatefulSet

Unlike Deployment he starts Pods with a predictable name, for example, worker-0, worker-1. Also scaling Pods happens in order. Also StatefulSet another policy for managing connected storage, which we will discuss in the next part. Reasons why worker uses StatefulSet you can see Here.
The description template is almost the same as Deploymentso we won’t dwell on it.

ConfigMap(Config)

When describing scheduler, we mentioned the resource ConfigMap, which we mount along the path ‘/opt/airflow/airflow.cfg’. Let’s look at it.

apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-airflow-config
  labels:
    tier: airflow
    component: config
    release: airflow
    chart: "airflow-1"
    heritage: Helm
data:
  airflow.cfg: |-
    [celery]
    worker_concurrency = 16

    [core]
    dags_folder = /opt/airflow/dags
    executor = CeleryExecutor
    load_examples = True

Most of the fields are familiar to us, except data. Here we describe the name and contents of the file that will be mounted.

Secret (Airflow-fernet-key)

When considering the design env we came across such a resource as secret. Let’s look at it using the example of a secret for a webserver:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-fernet-key
  labels:
    tier: airflow
    release: airflow
    chart: airflow
    heritage: Helm
  annotations:
    "helm.sh/hook": "pre-install"
    "helm.sh/hook-delete-policy": "before-hook-creation"
    "helm.sh/hook-weight": "0"
type: Opaque
data:
  fernet-key: "TTAxc05IQlBNakZsWVdwclEzSklXbFI2VkRWU01XUjFUM0JZVVV4aFV6ST0="

The description is almost the same as ConfigMaphowever there is an additional field type. Kubernetes has several types of secrets. We use the Opaque type, which contains arbitrary user data. IN data contains the secret itself encoded in base64 format. Key creation and encoding are done using helm.

also in annotations can see helm hooks, which allow you to control the application deployment order. Because we need to Secret was created before the scheduler will be deployed, we use the “pre-install” hook.

Job (Airflow-run-airflow-migrations)

For scheduler we need Pod, which is constantly working. But what if we need to perform some action one-time, for example, roll out migrations? The following Kubernetes resource is suitable for this: Job.
Remember that when describing scheduler Deployment we described InitContainerwhich checks whether migrations have already been made? The migration itself is described in Job:

apiVersion: batch/v1
kind: Job
metadata:
  name: airflow-run-airflow-migrations
  labels:
    tier: airflow
    component: run-airflow-migrations
    release: airflow
    chart: "airflow-1"
    heritage: Helm
spec:
  template:
    metadata:
      labels:
        tier: airflow
        component: run-airflow-migrations
        release: airflow
    spec:
      restartPolicy: OnFailure
      containers:
        - name: run-airflow-migrations
          image: apache/airflow:2.6.2
          imagePullPolicy: IfNotPresent
          args:
            - bash
            - -c
            - |-
              exec \
              airflow db upgrade
          env:
            - name: PYTHONUNBUFFERED
              value: "1"
            - name: AIRFLOW__CORE__FERNET_KEY
              valueFrom:
                secretKeyRef:
                  name: airflow-fernet-key
                  key: fernet-key
            …
            - name: AIRFLOW__CORE__LOAD_EXAMPLES
              value: "True"
          volumeMounts:
            - name: config
              mountPath: "/opt/airflow/airflow.cfg"
              subPath: airflow.cfg
              readOnly: true
      volumes:
        - name: config
          configMap:
            name: airflow-airflow-config

This is a new field for us restartPolicythe OnFailure value indicates that Job will restart in case of failure.

Service (Airflow-webserver)

It remains to consider such a resource as Service.
This resource is required to provide access to our application. For now we will use it so that we can access the Airflow UI from the web. This is not a product solution, but in order to make sure that everything works correctly, it’s just right:

apiVersion: v1
kind: Service
metadata:
  name: airflow-webserver
  labels:
    tier: airflow
    component: webserver
    release: airflow
    chart: "airflow-1"
    heritage: Helm
spec:
  type: NodePort
  selector:
    tier: airflow
    component: webserver
    release: airflow
  ports:
    - name: airflow-ui
      port: 8080

There are several types of services. We are using NodePort which will provide access to Pod by IP address of the node on which it spins Pod. To find out this ip, you can go to the Yandex Cloud console and look at it.

You also need to specify the port on which the webserver runs; it must be the same as in the settings of Airflow itself.

Namespace

Before deploying the application, we need to parse one more resource – Namespace. It allows you to split the cluster into logical groups. We will deploy all Airflow components in one Namespace. Also on Namespace different access policies can be imposed.

Checking that everything is working

Great, we have described all our resources, let’s take a look at the diagram (Fig. 5) of what it all looks like now:

Rice.  5 Image of Kubernetes resources used by Airflow

Rice. 5 Image of Kubernetes resources used by Airflow

Let’s now try to run our chart.
First we clone the repository:

git clone git@github.com:Siplatov/dn-airflow.git

Next, you need to replace the host, user and pass values ​​for postgresql with your own. These values ​​are in values.yaml.
After this you can release:

helm upgrade --install -n airflow --create-namespace airflow dn-airflow/part1.

After a few minutes, you can check whether the pods have already started (Fig. 6):

kubectl get pods -n airflow
Rice.  6 Checking the functionality of Pods

Rice. 6 Checking the functionality of Pods

We see that Jobs worked successfully, and everyone else Pods in running status. It means everything is OK. However, it appears that the webserver has been restarted several times. To understand why, you need to run the command:

kubectl describe pod airflow-webserver-dc54c9884-g7x5s -n airflow

At the end we will see the following picture (Fig. 7):

Rice.  7 Output of the describe pod command

Rice. 7 Output of the describe pod command

The Webserver did not have time to start within the time specified in initDelaySeconds livenessProbe and Kubernetes made several checks, considered that the Webserver was dead and decided to restart it. Therefore it is worth increasing initDelaySeconds.

To test the airflow UI, let’s look at services (Fig. 8):

kubectl get services -n airflow
Rice.  8 Get services command output

Rice. 8 Get services command output

We see that we need to check on port 32462

If, when opening the link, we see a registration panel (Fig. 9), then everything started successfully:

Rice.  9 Airflow authorization window

Rice. 9 Airflow authorization window

We specified the login and password in values.yaml, in the defaultUser paragraph.

Conclusion

We’ve looked at the minimum set of entities required to run Airflow. However, some points require improvement: secrets are in the yaml file, logs will be deleted after re-deployment, there is no synchronization with the repository with DAG, access is not via https, etc. We will discuss how to fix this in the next part.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *