Airflow in Kubernetes. Part 1
Greetings!
On the path of a data engineer, there are often tasks related to DevOps. One of these is to deploy Airflow in a Kubernetes cluster. If you have not had similar work experience before, then this task may not seem trivial. Of course, you can run several commands from the official guide, but if you need to fix something, it will be difficult to do without understanding what is happening inside. This article is intended to make this task easier. It will help those who have already worked with Airflow, but have not yet touched Kubernetes technology.
To make it easier to understand this topic, we will use the prepared repository. It is based on officialthrough simplification.
Preliminary work
Airflow, which we will deploy, will have the following architecture (Fig. 1):
As you can see, it’s a pretty standard scheme. The executor type is Celery, the message broker is Redis, the database is Postgresql.
Before moving on to the aspects of working with Kubernetes, there are a few preliminary things you need to do:
Creating a cluster. I recommend using the service to manage a kubernetes cluster in one of the clouds. Personally, I used Yandex Cloud. Here located instructions for its creation.
Installation kubectl. A utility that allows you to interact with the cluster.
Installation Helm. This tool will be described in more detail below.
Creating a database. Here you can also use a ready-made cloud solution – Managed service for Postgresql or pick it up yourself.
Helm
Helm used to install applications in a Kubernetes cluster. This tool provides a wide range of capabilities, but now we are interested in one of them – templating.
The main entity in Helm is chart. Helm chart is a package containing templates and the information needed to complete them. Let’s take a closer look at the contents of our chart (see repository). There we will see:
Chart.yaml – some information about our chart
templates – directory with all templates that will be deployed to the Kubernetes cluster
values.yaml – here we define the variables that we will use in templates
For example, we have an executor value that we want to use. Let’s write it in values.yaml
executor: "CeleryExecutor"
Now, to use this variable in the scheduler template, you need to write the following in the corresponding template file:
{{ .Values.executor }}
Looks like a jinja template in airflow, doesn’t it?
However, sometimes it is inconvenient to simply write all the values in values; you want to somehow pre-process them. To do this, we can use _helpers.yaml, which is located along with the templates in the templates directory. For example, we want to combine the repository and tag values for a docker image:
{{- define "airflow_image" -}}
{{- $repository := .Values.images.airflow.repository -}}
{{- $tag := .Values.images.airflow.tag -}}
{{- printf "%s:%s" $repository $tag -}}
{{- end }}
Now we can call this function in the scheduler template:
{{ template "airflow_image" . }}
The presented repository also uses more complex functions; to understand what they do, you can look at the finished manifest. To get it you need to do
helm template airflow dn-airflow/part1/.
I also put result executing this code into the repository. In this article I will use already completed templates.
Deployment (Scheduler)
So let’s tell Kubernetes what we want it to do. To do this, you need to create manifests – yaml files in which we describe some rules.
Let’s start with the manifest for the scheduler:
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-scheduler
labels:
tier: airflow
component: scheduler
release: airflow
chart: "airflow-1"
executor: CeleryExecutor
spec:
replicas: 1
selector:
matchLabels:
tier: airflow
component: scheduler
release: airflow
template:
metadata:
labels:
tier: airflow
component: scheduler
release: airflow
spec:
initContainers:
- name: wait-for-airflow-migrations
image: apache/airflow:2.6.2
imagePullPolicy: IfNotPresent
volumeMounts:
- name: config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
readOnly: true
args:
- airflow
- db
- check-migrations
- --migration-wait-timeout= 60
env:
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow-fernet-key
key: fernet-key
…
- name: AIRFLOW__CORE__LOAD_EXAMPLES
value: "True"
containers:
- name: scheduler
image: apache/airflow:2.6.2
imagePullPolicy: IfNotPresent
args:
- bash
- -c
- exec airflow scheduler
env:
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow-fernet-key
key: fernet-key
…
- name: AIRFLOW__CORE__LOAD_EXAMPLES
value: "True"
livenessProbe:
initialDelaySeconds: 10
timeoutSeconds: 20
failureThreshold: 5
periodSeconds: 60
exec:
command:
- sh
- -c
- |
CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \
airflow jobs check --job-type SchedulerJob --local
volumeMounts:
- name: logs
mountPath: "/opt/airflow/logs"
- name: config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
readOnly: true
volumes:
- name: config
configMap:
name: airflow-airflow-config
- name: logs
emptyDir: {}
The first thing that catches our eye is the resource type:
kind: Deployment
To understand what it is Deploymentlet’s start the dive with a simpler entity – Pod. These are multiple containers that run together on the same node, with a shared network and storage. In our case, there are 2 of them: a container with an application and a container with an application that is waiting for migrations (more on this later)
Let us schematically denote Pod thus (Fig. 2):
To run multiple replicas of the same Pod we need a resource ReplicaSet. (Fig. 3), which ensures that the number Pods corresponded to the stated quantity in the manifest.
Usually not used separately in production Pod And ReplicaSet. Use their parent object – Deployment (Fig. 4). Because Deployment controls updates Pods. There are 2 renewal strategies. One of them is Rolling Update, which allows you to update Pods one after another, which is more useful in some cases than killing everything first Podsand then launch new ones with the new version of the application.
Exactly Deploymentwhich is used for Airflow scheduler, we will analyze in detail:
apiVersion: apps/v1
We set the API version that we will use for our resource. To view supported versions for a cluster, you can use the command:
kubectl api-versions
metadata
Here we set the name Deploymentand also set labels. This is optional information about our Deployment. But thanks to them, we can filter the selection of resources. For example, the command
kubectl get pods -n airflow -l component=scheduler
will return only pods with labels component=scheduler
replicas
This is the number of replicas that will be deployed
selector
Here we describe how ReplicaSet will understand what Pods manage. We use matchLabels where we indicate labels. Exactly the same labels we must indicate V template.
template
Here we describe the pattern Pod. We also ask metadatain which we indicate labels (which uses selector)
spec
The specification of our pod includes information about containers, environment variables, mounted volumes and other information, which we will discuss below:
initContainers
This is a container that will start before the main application starts. We use it to check if migrations have already rolled out. We do this using the command:
airflow db check-migrations
Description initContainers similar to the description Containerswhich we will consider in more detail:
containers
image: name of the image we will use
imagePullPolicy: image loading policy. In our case, Kubernetes compares digest the specified container and those located on the machine. If they do not match, then it downloads.
command and args: specify the command that is executed when the container starts.
env: stores environment variables that will be available inside the pod, in addition to the usual ones, with a name and value, here you can add variables whose value will come from Secretanother resource in Kubernetes that we will discuss below.
livenessProbe
Necessary if something goes wrong with the application running in Podthen Kubernetes will understand this and restart the broken one Pod.
It contains additional metrics:
initDelaySeconds: how many seconds to wait before starting the first scan. If Pod takes a long time to start, then it’s worth setting a higher value so that Kubernetes doesn’t kill Podbefore the application starts.
periodSeconds: how often to run checks.
timeoutSeconds: The time Kubernetes waits for a response from the application. If the application does not respond during this time, then the check failed.
failureThreshold: number of failed attempts Kubernetes will make before killing Pod.
exec: describes the way we check the state Pod. In our case, we launch the shell script.
volumeMounts
determines which path to mount volumes that are described in volumes
volumes
In our case there are 2 of them: config, in which we mount configMap (we will discuss this resource below). And also logs. For it we create a volume like emptyDir. This storage will be available as long as you live Pod. After restart Podthe logs directory will be empty.
StatefulSet(Worker)
To run stateful applications, redis and worker in our case, we use StatefulSet
Unlike Deployment he starts Pods with a predictable name, for example, worker-0, worker-1. Also scaling Pods happens in order. Also StatefulSet another policy for managing connected storage, which we will discuss in the next part. Reasons why worker uses StatefulSet you can see Here.
The description template is almost the same as Deploymentso we won’t dwell on it.
ConfigMap(Config)
When describing scheduler, we mentioned the resource ConfigMap, which we mount along the path ‘/opt/airflow/airflow.cfg’. Let’s look at it.
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-airflow-config
labels:
tier: airflow
component: config
release: airflow
chart: "airflow-1"
heritage: Helm
data:
airflow.cfg: |-
[celery]
worker_concurrency = 16
[core]
dags_folder = /opt/airflow/dags
executor = CeleryExecutor
load_examples = True
Most of the fields are familiar to us, except data. Here we describe the name and contents of the file that will be mounted.
Secret (Airflow-fernet-key)
When considering the design env we came across such a resource as secret. Let’s look at it using the example of a secret for a webserver:
apiVersion: v1
kind: Secret
metadata:
name: airflow-fernet-key
labels:
tier: airflow
release: airflow
chart: airflow
heritage: Helm
annotations:
"helm.sh/hook": "pre-install"
"helm.sh/hook-delete-policy": "before-hook-creation"
"helm.sh/hook-weight": "0"
type: Opaque
data:
fernet-key: "TTAxc05IQlBNakZsWVdwclEzSklXbFI2VkRWU01XUjFUM0JZVVV4aFV6ST0="
The description is almost the same as ConfigMaphowever there is an additional field type. Kubernetes has several types of secrets. We use the Opaque type, which contains arbitrary user data. IN data contains the secret itself encoded in base64 format. Key creation and encoding are done using helm.
also in annotations can see helm hooks, which allow you to control the application deployment order. Because we need to Secret was created before the scheduler will be deployed, we use the “pre-install” hook.
Job (Airflow-run-airflow-migrations)
For scheduler we need Pod, which is constantly working. But what if we need to perform some action one-time, for example, roll out migrations? The following Kubernetes resource is suitable for this: Job.
Remember that when describing scheduler Deployment we described InitContainerwhich checks whether migrations have already been made? The migration itself is described in Job:
apiVersion: batch/v1
kind: Job
metadata:
name: airflow-run-airflow-migrations
labels:
tier: airflow
component: run-airflow-migrations
release: airflow
chart: "airflow-1"
heritage: Helm
spec:
template:
metadata:
labels:
tier: airflow
component: run-airflow-migrations
release: airflow
spec:
restartPolicy: OnFailure
containers:
- name: run-airflow-migrations
image: apache/airflow:2.6.2
imagePullPolicy: IfNotPresent
args:
- bash
- -c
- |-
exec \
airflow db upgrade
env:
- name: PYTHONUNBUFFERED
value: "1"
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow-fernet-key
key: fernet-key
…
- name: AIRFLOW__CORE__LOAD_EXAMPLES
value: "True"
volumeMounts:
- name: config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
readOnly: true
volumes:
- name: config
configMap:
name: airflow-airflow-config
This is a new field for us restartPolicythe OnFailure value indicates that Job will restart in case of failure.
Service (Airflow-webserver)
It remains to consider such a resource as Service.
This resource is required to provide access to our application. For now we will use it so that we can access the Airflow UI from the web. This is not a product solution, but in order to make sure that everything works correctly, it’s just right:
apiVersion: v1
kind: Service
metadata:
name: airflow-webserver
labels:
tier: airflow
component: webserver
release: airflow
chart: "airflow-1"
heritage: Helm
spec:
type: NodePort
selector:
tier: airflow
component: webserver
release: airflow
ports:
- name: airflow-ui
port: 8080
There are several types of services. We are using NodePort which will provide access to Pod by IP address of the node on which it spins Pod. To find out this ip, you can go to the Yandex Cloud console and look at it.
You also need to specify the port on which the webserver runs; it must be the same as in the settings of Airflow itself.
Namespace
Before deploying the application, we need to parse one more resource – Namespace. It allows you to split the cluster into logical groups. We will deploy all Airflow components in one Namespace. Also on Namespace different access policies can be imposed.
Checking that everything is working
Great, we have described all our resources, let’s take a look at the diagram (Fig. 5) of what it all looks like now:
Let’s now try to run our chart.
First we clone the repository:
git clone git@github.com:Siplatov/dn-airflow.git
Next, you need to replace the host, user and pass values for postgresql with your own. These values are in values.yaml.
After this you can release:
helm upgrade --install -n airflow --create-namespace airflow dn-airflow/part1.
After a few minutes, you can check whether the pods have already started (Fig. 6):
kubectl get pods -n airflow
We see that Jobs worked successfully, and everyone else Pods in running status. It means everything is OK. However, it appears that the webserver has been restarted several times. To understand why, you need to run the command:
kubectl describe pod airflow-webserver-dc54c9884-g7x5s -n airflow
At the end we will see the following picture (Fig. 7):
The Webserver did not have time to start within the time specified in initDelaySeconds livenessProbe and Kubernetes made several checks, considered that the Webserver was dead and decided to restart it. Therefore it is worth increasing initDelaySeconds.
To test the airflow UI, let’s look at services (Fig. 8):
kubectl get services -n airflow
We see that we need to check on port 32462
If, when opening the link, we see a registration panel (Fig. 9), then everything started successfully:
We specified the login and password in values.yaml, in the defaultUser paragraph.
Conclusion
We’ve looked at the minimum set of entities required to run Airflow. However, some points require improvement: secrets are in the yaml file, logs will be deleted after re-deployment, there is no synchronization with the repository with DAG, access is not via https, etc. We will discuss how to fix this in the next part.