3 Ways to Run Spark on Kubernetes from Airflow

Hello, after I learned how to run spark-submit with a master in Kubernetes and even got the expected result, it's time to schedule my task in Airflow. And then the question arose of how to do it correctly. There are several options offered on the World Wide Web and it was not clear to me which one to choose. So I tried some of them and now I will share my experience. Enjoy reading!

In this article I will talk about 3 methods using the following tools:

All the code used for these launch methods can be found here. Also video for those who better perceive information in this form. And in it I go into more detail on examples of running code.

And before we talk about each method and highlight the advantages and disadvantages of each, I want to mention a few words about what running Spark on Kubernetes is all about.

Running Spark on Kubernetes

To run Spark on Kubernetes we need:

  • Kubernetes cluster

  • Docker registry

  • A machine with openjdk and Spark installed (just an unpacked archive)

We also need to create a namespace and a service account, which must be given the necessary rights. Now you need to build an image with your application and push it to the Docker registry. Or use public image and run the example, as I will do.

After this, we run the spark-submit command, in which you need to specify the k8s host and port, as well as the docker image:

spark-submit \
    --master k8s://https://K8S_HOST>:<K8S_PORT> \
    --deploy-mode cluster \
    --name spark-pi \
    --conf spark.executor.instances=2 \
    --conf spark.kubernetes.container.image=SPARK_IMAGE \
    local:///opt/spark/examples/src/main/python/pi.py

The process can be schematically represented as follows:

After executing the command, we will see pods with drivers in the cluster, and also, after a while, pods with executors. It is worth noting that not only pods are created in the cluster, but also services and configmaps.

We can also modify submit and add, for example, volumes. Or, we can set manifest templates for the driver or executors pods. All this is described in detail in documentation.

Now that we've remembered how to run Spark locally, let's move on to why we're here. Let's launch our application from Airflow.

Path 1. Apache-airflow-providers-apache-spark

The first thing that comes to mind when you need to launch something from Airflow is to look for the required package. This is the case for our case too. existsso let’s modify our image for Airflow and install the necessary dependency:

FROM apache/airflow:2.8.0-python3.11
WORKDIR /opt/airflow
RUN pip install apache-airflow-providers-apache-spark
USER root
RUN sudo apt-get update -y
RUN sudo apt-get install openjdk-17-jdk -y
USER 50000

However, as you may have noticed, you additionally need to install openjdk, because… without it, submit will not run. And after these manipulations, the image size increases from 350mb to 1.5gb, which can have an unpleasant effect on the execution time of CI/CD processes.

The launch process itself can be represented as follows:

In DAG we create a similar task:

submit = SparkSubmitOperator(
        task_id='submit',
        conn_id='spark',
        application='local:///opt/spark/examples/src/main/python/pi.py',
        name="spark-pi",
        conf={
			'spark.submit.deployMode': 'cluster',
			'spark.kubernetes.container.image': 'apache/spark-py',
			'spark.executor.memory': '500m',
			'spark.executor.instances': '1',
			'spark.kubernetes.executor.request.cores': '0.1',
			'spark.driver.memory': '500m',
        }
    )

Create an Airflow connection:

Creating a spark connection

Creating a spark connection

Now let's run the task:

Successful completion of submit

Successful completion of submit

As you can see, the submit log is displayed in the logs, but I would like to see the driver log. This is also inconvenient; if the task fails with an error, you will have to go to the cluster to get the logs:

submit failed

submit failed

Therefore, the following conclusions can be drawn:

pros:

Minuses:

In search of a solution that would eliminate these shortcomings, I came across the tool Apache Livy

Path 2: Apache Livy

Apache Livy is a separate service that is designed to simplify interaction with Spark. Now, instead of spark-submit, we can use REST, thanks to which in Airflow we only need the requests library. In addition to launching, we can also monitor the status of the application. Roughly speaking, now we will not launch spark-submit, but Livy, which allows us to avoid adding additional dependencies to the Airflow image.

Another advantage of Apache Livy

In addition, the service has the ability to create sessions that will save Spark Context, which allows you to interact with a Spark session from different Airflow tasks. A potentially useful thing that has already been written about here

But it’s worth considering that this project is in Apache incubation status, which carries certain risks:

While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the The Apache Software Foundation.

In addition, in gita project there is not a single mention of Kubernetes. However, the temptation to launch Sspark applications using an HTTP request is high enough to test this tool in practice. To do this, I slightly rewrote the image from the official repository and wrote a small k8s manifesto.

Now let's write a simple DAG:

@task()
    def submit():

        headers = {
        'Content-Type': 'application/json',
        }

        data=""'{
                "name": "test-001", 
                "className": "org.apache.spark.examples.SparkPi", 
                "numExecutors": 1,
                "file": "local:///opt/spark/examples/src/main/python/pi.py",
                "conf": { 
                    "spark.kubernetes.driver.pod.name" : "spark-pi-driver-001",
                    "spark.kubernetes.container.image" : "apache/spark-py",
                    "spark.kubernetes.namespace" : "airflow"
                        }
                    }'''

        response = requests.post('http://livy:8998/batches/', headers=headers, data=data)
        
        pass

After launching this DAG, we will see that a new batch has appeared in the Livy interface. And the application worked successfully.

Livy interface with a working task

Livy interface with a working task

Also, using the API, we can track the status, which is displayed in the interface in the screenshot above. But let's check if this status is correct. To do this, let's run the application with an error:

Launching the application with an error (test-002)

Launching the application with an error (test-002)

As you can see, the status of task test-002 is success, so you cannot rely on the status that comes from Livy.

To correctly track the status, you can use the REST API of the Spark application itself; you can read more about this here. Also the author of this article wrote an operator and sensor for Airflow that allows you to run and monitor the status of tasks in Livy. However, I did not check their work, because… The Livy scheme seemed quite complicated to me compared to the following launch method.

pros:

Minuses:

  • Incorrect application status in Livy. To receive the correct status, additional actions are required

  • The driver log is not displayed in the logs

  • Low project support

Spark Operator

Kubernetes allows you to create user resources. This is exactly what spark-operator is, the documentation and sources of which are located here. The project also has beta status, but there is page with companies that use it in a productive environment and there are quite a lot of them.

There is a Chart in the repository that can be easily installed. Also to run you need to install it in Airflow plastic bag apache-airflow-providers-cncf-kubernetes, which is needed to interact with the kubernetes cluster. It is quite small compared to apache-airflow-providers-apache-spark from point 1.

The operating principle of Spark Operator is as follows. We from Airflow issue a command to create a custom kubernetes resource – SparkApplication, which already launches and monitors the state of the Spark application. We also launch a sensor in Airflow that monitors the status of SparkApplication:

The Airflow DAG code for this process will look like this:

@dag(
    schedule=None,
    start_date= datetime(2024, 3, 1),
)
def spark_operator():


    submit = SparkKubernetesOperator(
        task_id="submit",
        namespace="airflow",
        application_file="spark-application.yaml",
        do_xcom_push=True,
        params={"app_name": "spark-pi"},
    )

    submit_sensor = SparkKubernetesSensor(
        task_id="submit_sensor",
        namespace="airflow",
        application_name="{{ task_instance.xcom_pull(task_ids="submit")['metadata']['name'] }}",
        attach_log=True,
    )

    submit >> submit_sensor

spark_operator()

You also need to put the spark-application.yaml file with the k8s manifest next to it, in which we indicate the required parameters.

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi
spec:
  type: Python
  pythonVersion: "3"
  timeToLiveSeconds: 1800
  mode: cluster
  image: "apache/spark-py"
  imagePullPolicy: Always
  mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
  sparkVersion: "3.4.0"
  driver:
    coreLimit: "100m"
    coreRequest: "100m"
    memory: "500m"
  executor:
    coreLimit: "100m"
    coreRequest: "100m"
    instances: 1
    memory: "500m"

To put it simply, we run the command kubectl apply -f spark-application.yaml from Airflow. And after we launch the task, we can see the new kubernetes resource using the command:

kubectl get sparkapplications -n spark
Output of kubectl get sparkapplications -n spark command

Output of kubectl get sparkapplications -n spark command

After some time, the usual pods with drivers and executors will start.

But the most valuable thing is that in the sensor logs we will see Spark driver logs:

Number of PI in sensor logs

Number of PI in sensor logs

And also that if there is an error in the Spark application, the task with the sensor will also crash and show an application error, which allows you to correctly track launches:

The reason for the crash is visible in the Airflow logs

The reason for the crash is visible in the Airflow logs

But not everything will be in the logs

However, if the driver fails due to OOM, we will not see logs about this in the sensor, although the task will still crash. These types of errors can be found using the kubectl describe sparkapplication command. Or add an operator to also pick up sparkapplication events.

pros:

Minuses:

Conclusion

I think all three methods have a right to exist. If you need to quickly set up a launch, it is better to use method 1. If you are already using Livy or you need to save the spark context between launches, then you can use the 2nd method, without forgetting to obtain the correct statuses. If you have basic knowledge about Kubernetes, then option 3 is best suited, at least, I decided to focus on it. Thank you!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *