Airflow in Kubernetes. Part 2


In the last part, we deployed the main Airflow services. However, we still have some unanswered moments. Such as:

  • Synchronizing a DAG list with a remote repository

  • Saving Worker logs

  • Configuring access from an external network for Webserver

In this part we will go through these questions. IN repository added code to this part of the article.

Synchronizing a DAG list with a remote repository

Last time, to check the functionality of the deployment, we used the DAGs that Airflow provides as examples. However, in real life we ​​will write our own DAGs, so we will set .values.config.coreload_examples to false and look at the tool git-sync.
This application synchronizes a branch of a given remote repository with a specified directory every few seconds. We also use this directory as volume (in the example it is called dags). After this, we also mount this volume in another container, namely in containers with Worker and Scheduler (Fig. 1). Scheduler itself will once in a while look for new files in this directory and register them in the database.

Rice.  1 Communication of the repository with Pod Worker

Rice. 1 Communication of the repository with Pod Worker

Because this is a separate application that will work together with the main one, we need an additional container in Pod worker. Let’s define a template for git-sync in _helpers.yaml, just like we did in official helm chart. We’ll just use the newer version 4, the names of the environment variables are different, but the basic principle is the same. This is what the settings for git-sync will look like:

- name: git-sync
  imagePullPolicy: IfNotPresent
      runAsUser: 65533
      # Путь до ssh key
      value: "/etc/git-secret/ssh"
      # Отключаем верификацию хостов
      value: "false"
      # Наименование ветки, с которой будем синхронизироваться
    - name: GITSYNC_REF
      value: "master"
      # Ссылка на репозиторий, с которым будем синхронизироваться 
    - name: GITSYNC_REPO
      value: ""
      # Директория для операций git-sync
    - name: GITSYNC_ROOT
      value: "/git"
      # Наименование директории, в которой будет находится код из репозитория
    - name: GITSYNC_LINK
      value: "repo"
      # Как часто синхронизироваться с репозиторием
    - name: GITSYNC_PERIOD
      value: "10s"
      # Кол-во сбоев, после которых прерываем выполнение
      value: "0"
  - name: dags
    mountPath: /git
  - name: git-sync-ssh-key
    mountPath: /etc/git-secret/ssh
    readOnly: true
    subPath: gitSshKey
- name: config
    name: airflow-airflow-config
- name: dags
  emptyDir: {}
- name: git-sync-ssh-key
    secretName: airflow-ssh-secret
    defaultMode: 256

Please note that we will use a specific user to run the application (65533). This necessaryto have access to the ssh key.
We also additionally mount a volume with a secret, which contains the private ssh key, which is used for git clone, in base64 encoding. To do this you can use the following command:

base64 ~/.ssh/id_rsa -w 0 > temp.txt

And the necessary entry will appear in the temp.txt file, which will need to be inserted into git-sync-secret.yaml:

apiVersion: v1
kind: Secret
  name: airflow-ssh-secret
  gitSshKey: <base_64_ssh_key>

Do not publish information encoded in base64, because… it is also easy to decode.

Let’s see what’s inside the git-sync container (Figure 2):

kubectl exec -it airflow-worker-0 -c git-sync -n airflow -- sh
Rice.  2 Contents of the /git directory of the git container

Rice. 2 Contents of the /git directory of the git container

And also what is inside the dags directory in Worker (Fig. 3):

kubectl exec -it airflow-worker-0 -c worker -n airflow -- /bin/bash
Rice.  3 Contents of the directory /opt/airflow/dags of the worker container

Rice. 3 Contents of the directory /opt/airflow/dags of the worker container

In Figures 2, 3 you can see that the contents of the directories are the same. Since we are synchronizing the entire repository, the full path to the test DAG, which is located in repositories, will be like this: /opt/airflow/dags/repo/part2/dags/ Now we can create new DAGs in the repository and they will appear in the Airflow UI.

Saving Worker logs

To store data anywhere, k8s uses volumes. We met this design in the last part, when we installed Secrets And ConfigMaps. This time we will use volumeto save logs. Of course, we can simply save them in some directory, but this will not guarantee us the safety of the logs after restarting Airflow. It would be cool to save them to some external drives that are not related to the state of the cluster. For this we need to use volume. But to understand how to do this, you need to get acquainted with other types of kubernetes resources:

  • PersistentVolume (PV)
    This is a resource that reserves space in a specific storage. You can create several PersistentVolume with different types of storage in one cluster, for example, for working with ssd and hdd drives. We will use yc-network-hdd. Also for PersistentVolume determined accessModes – volume access policy. We use ReadWriteOnce which means that writing and reading can only happen from one node (virtual machine).

  • PersistentVolumeClaim (PVC)
    This is an abstraction that allows Pod request a certain amount of space from PersistentVolume.

It turns out that to allocate disk space for a Pod, you need to:

  • Create PV with a specific storage type and size

  • Create PVCwhich will use part of the space PV

  • Define volume in the manifesto Podwhich will call PVC

There is a resource to reduce the number of steps Provisioner. It allows you to dynamically create PV For PVC. In Yandex Cloud, we do not need to configure anything additional when creating PVC will be created automatically PV same class and size.

Since Airflow worker (it is he who writes the logs) is deployed as StatefulSet then we won’t have to create PVC hands, we will indicate volumeClaimTemplates in the manifesto StatefulSet. This must be done because each replica StatefulSet creates a separate PVC (unlike Deployment). Let’s complete our helm template for workers as follows:

{{- if not .Values.logs.persistence.enabled }}
      - name: logs
        emptyDir: {}
  - metadata:
      name: logs
      accessModes: ["ReadWriteOnce"]
          storage: {{ .Values.logs.persistence.size }}

Let’s check that the PVC and PV were created successfully (Fig. 4):

kubectl get pvc -n airflow
Rice.  4 Checking the presence of PVC and PV

Rice. 4 Checking the presence of PVC and PV

Let’s now do a little experiment to understand how they behave PVC when deleting Pod. To do this, I suggest running another one Pod with worker, to do this we run the command:

kubectl scale statefulsets airflow-worker --replicas=2 -n airflow
Rice.  5 Scaling workers

Rice. 5 Scaling workers

If you withdraw PVC And PVwe will see that another storage has been added:

Rice.  6 Adding PV and PVC

Rice. 6 Addendum PV And PVC

When returning the number of replicas to one, the new PVC and PV will not be deleted.
Thus, if something happens to one of the replicas, the logs will still remain and can be read after the replica is restored. Schematically, the picture can be represented as follows:

Rice.  7 Schematic representation of StatefulSet and PVC

Rice. 7 Schematic illustration StatefulSet And PVC

Setting up access from an external network for Webser

Last time, to access the Airflow UI we used the resource Service type NodePort. Of course, in a food environment, we would like to access not via an IP address, but by a domain name and access via https. To implement this, we need a resource Ingress is a resource in which we describe traffic management rules:

kind: Ingress
  name: airflow-ingress
    release: airflow
  annotations: nginx
  - host:
      - backend:
            name: airflow-webserver
              name: airflow-ui
        path: /
        pathType: ImplementationSpecific

IN rules It’s easy enough to follow the logic. We indicate that we want to open the page at (using you can give a hostname for an IP address for free), from where we will be directed to service airflow-webserver to the airflow-ui port (indicated in values ​​- 8080).
In addition, we also indicate in annotations, information about ingress-controller. Why is it needed? Ingress-controller this is the component that will do all the work of routing traffic. Inside it may contain Nginx, Traefik, etc. Those. in ingress we simply describe the rules and indicate which ingress-controller we want to use and already ingress-controller These rules are implemented using nginx (in our case).

But first you need to install this ingress-controller. This is easy to do using helm:

helm repo add ingress-nginx
helm repo update
helm upgrade --install ingress-nginx ingress-nginx/ingress-nginx -n ingress --create-namespace

We have installed ingress-controller in a separate namespace to logically separate it from our main service. Let’s see what will be created in this namespace:

Rice.  8 Namespace ingress content

Rice. 8 Contents namespace ingress

Among these resources, we are interested in service, namely its EXTERNAL-IP – in our case. It is at this address that the webserver will be available and we used it as part of the domain name in the manifest with ingress.
So, as soon as we launch a resource in the cluster ingressthen thanks to the annotation nginxinstalled ingress-controller understands that we want to transfer traffic in a certain way and changes the parameters of the application that is used to route the traffic.
PS Also in YC, NetworkBalancer will be behind the ingress-controller, but you don’t need to configure it in any way, it will be created automatically.

TLS Certificate

Great, but we only have access via http, but we would like https. To do this, we need to add a few lines with ingress, and also deploy a few more manifests in the cluster. Namely cert-manager. Let’s do it with kubectl:

kubectl apply -f

Let’s expand ClusterIssuer:

kubectl apply -f cluster-issuer.yaml -n cert-manager

And add a few lines to the manifest with ingress:

kind: Ingress
  name: airflow-ingress
    release: airflow
  annotations: nginx letsencrypt
    # использую приобретенное доменное имя, т.к. с трудно получить сертификат (слишком много желающих для этого домена).
  - host:
      - backend:
            name: airflow-webserver
              name: airflow-ui
        path: /
        pathType: ImplementationSpecific
  - hosts:
    secretName: ingress-webserver-secret

In the annotations we indicate issuerand also indicate in the section tlswhere we will store the secret for our host.
Principle from work cert-manager looks like work ingress-controller. Thanks to the annotation letsencrypt cert-manager understands that we want to use letsencrypt to obtain the certificate. Receives it and saves it to the specified secret.

If the certificate does not want to be installed (for example, if you use host with or you need to find out the validity period of the certificate, then this can be done with the following command:

kubectl describe certificate ingress-webserver-secret -n airflow

If everything is done correctly, then in the browser we will see the treasured lock:

Rice.  9 Checking https connection

Rice. 9 Checking https connection


This time we set up synchronization of the remote repository with the Airflow directory, implemented permanent storage of logs, and also configured an https connection for the Webserver. After these transformations, our cluster can be depicted as follows (Fig. 10).

Rice.  10 Schematic illustration of a cluster with Airflow

Rice. 10 Schematic illustration of a cluster with Airflow

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *