Monitoring Spark Streaming in Kubernetes with Prometheus and Grafana


Introduction

Deploying Apache Spark to Kubernetesinstead of using managed services like AWS EMR, Azure Databricks or HDInsightmay be due to cost-effectiveness and portability. You can read more about migration from AWS EMR to K8s in this article

However, there are a number of issues that arise when leaving managed services. And probably the biggest one is the loss of monitoring and alerting. For example, AWS EMR has some really powerful built-in monitoring tools in the form of CloudWatch, Ganglia, CloudTrail, and YARN history server. In this article, we’ll look at implementing monitoring for Apache Spark on Kubernetes using Prometheus and Grafana.

Task

Keeping Apache Spark running in production when handling large amounts of data is a real challenge. A lot of things can go wrong: the executor can crash, the latency of external data sources can increase, performance degrades due to changes in input data or code, etc. To solve all these problems, it is necessary to proactively track the required metrics in real time. Some of these metrics include:

  1. Resource Usage: number of cores, CPU time, memory used, maximum allocated memory, used disk space.

  2. Spark task: number of active / dropped / completed tasks, maximum / average / minimum duration of tasks.

  3. Spark shuffling: amount of read / write shuffle operations.

  4. Spark scheduler: number of active / dropped / completed jobs.

  5. Spark streaming: number of receivers, number of started / dropped / completed packets, number of received / processed records, average record processing time.

  6. Custom metrics: Application-specific metrics should also be monitored along with system metrics.

Solution

Prometheus Is one of the most popular Kubernetes monitoring tools. Decentralized, open source, large community and member of the Cloud Native Computing Foundation. Prometheus stores data as a time series. For queries use PromQL, and for visualization Grafana or built-in browser.

Step 1. Setting up sink

A combination of the built-in JmxSink and JmxExporter could be used to monitor Spark 2.x, but in Spark 3.0 a new sink has appeared – PrometheusServlet. The advantages of PrometheusServlet compared to JmxSink and JmxExporter are obvious: the dependence on an external JAR is eliminated, the same network port is used for monitoring, on which Spark is already located, you can use Prometheus Service Discovery in Kubernetes.

In order to include a new sink, add the file to the project metrics.properties (if not already).

Add to metrics.properties the following configuration:

# Example configuration for PrometheusServlet
# Master metrics - http://localhost:8080/metrics/master/prometheus/
# Worker metrics - http://localhost:8081/metrics/prometheus/
# Driver metrics - http://localhost:4040/metrics/prometheus/
# Executors metrics - http://localhost:4040/metrics/executors/prometheus
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheusServlet.path=/metrics/prometheus
master.sink.prometheusServlet.path=/metrics/master/prometheus
applications.sink.prometheusServlet.path=/metrics/applications/prometheus

Step 2: Deploy the Application

The Spark application is deployed to K8s via a Docker image. Below is an example Dockerfile with multi-stage assembly (multi-stage). The first step is to compile and build a Spark scala application using SBT, the second is the base Spark image, and the last is the build of the final image. At the last stage, copy metrics.properties to the / opt / spark / conf / folder.

FROM openjdk:8 AS build

# Env variables
ENV SCALA_VERSION 2.12.12
ENV SBT_VERSION 1.2.8
# Install Scala
## Piping curl directly in tar
RUN 
curl -fsL https://downloads.typesafe.com/scala/$SCALA_VERSION/scala-$SCALA_VERSION.tgz | tar xfz - -C /root/ && 
echo >> /root/.bashrc && 
echo "export PATH=~/scala-$SCALA_VERSION/bin:$PATH" >> /root/.bashrc

# Install sbt
RUN 
curl -L -o sbt-$SBT_VERSION.deb https://dl.bintray.com/sbt/debian/sbt-$SBT_VERSION.deb && 
dpkg -i sbt-$SBT_VERSION.deb && 
rm sbt-$SBT_VERSION.deb && 
apt-get update && 
apt-get install sbt && 
sbt sbtVersion && 
mkdir project && 
echo "scalaVersion := "${SCALA_VERSION}"" > build.sbt && 
echo "sbt.version=${SBT_VERSION}" > project/build.properties && 
echo "case object Temp" > Temp.scala && 
sbt compile && 
echo "done with compiling, starting deletion" && 
rm -rf project && 
rm -f build.sbt && 
rm -f Temp.scala && 
rm -rf target && 
echo "done with deletion" && 
mkdir -p /spark/ && 
echo "created spark directory" && 
curl -sL https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz|gunzip| tar x -C /spark/ && 
echo "curled spark" && 
#rm /spark/spark-3.0.1-bin-hadoop3.2/jars/kubernetes-*-4.1.2.jar && 
echo "starting with wget" && 
wget https://repo1.maven.org/maven2/io/fabric8/kubernetes-model-common/4.4.2/kubernetes-model-common-4.4.2.jar -P /spark/spark-3.0.1-bin-hadoop3.2/jars/ && 
wget https://repo1.maven.org/maven2/io/fabric8/kubernetes-client/4.4.2/kubernetes-client-4.4.2.jar -P /spark/spark-3.0.1-bin-hadoop3.2/jars/ && 
wget https://repo1.maven.org/maven2/io/fabric8/kubernetes-model/4.4.2/kubernetes-model-4.4.2.jar -P /spark/spark-3.0.1-bin-hadoop3.2/jars/ && 
echo "done with wget"

# Define working directory
WORKDIR /opt/input

# Project Definition layers change less often than application code
COPY app/build.sbt ./
WORKDIR /opt/input/project
# COPY project/*.scala ./
COPY app/project/build.properties ./
COPY app/project/*.sbt ./

WORKDIR /opt/input
RUN sbt reload

# Copy rest of application
COPY app ./
RUN sbt testCoverage
RUN SBT_OPTS="-Xms2048M -Xmx2048M -Xss1024M -XX:MaxMetaspaceSize=2048M" sbt 'set test in assembly := {}' clean assembly

FROM openjdk:8-alpine AS spark

# install python
ENV PYTHONUNBUFFERED=1
RUN apk add --update --no-cache python3 && ln -sf python3 /usr/bin/python
RUN python3 -m ensurepip
RUN pip3 install --no-cache --upgrade pip setuptools

ARG spark_home=/spark/spark-3.0.1-bin-hadoop3.2

RUN set -ex && 
    apk upgrade --no-cache && 
    apk add --no-cache bash tini libc6-compat gcompat linux-pam nss && 
    mkdir -p /opt/spark && 
    mkdir -p /opt/spark/work-dir && 
    touch /opt/spark/RELEASE && 
    rm /bin/sh && 
    ln -sv /bin/bash /bin/sh && 
    echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && 
    chgrp root /etc/passwd && chmod ug+rw /etc/passwd

COPY --from=build ${spark_home}/jars /opt/spark/jars
COPY --from=build ${spark_home}/bin /opt/spark/bin
COPY --from=build ${spark_home}/sbin /opt/spark/sbin
COPY --from=build ${spark_home}/kubernetes/dockerfiles/spark/entrypoint.sh /opt/

FROM spark AS final

ENV SPARK_HOME /opt/spark
RUN mkdir /opt/spark/conf

COPY scripts/entrypoint.sh /tmp/
COPY app/src/main/resources/log4j.properties /opt/spark/conf/
COPY app/src/main/resources/metrics.properties /opt/spark/conf/
COPY --from=build /opt/input/target/scala-2.12/legion-streaming-assembly-0.2.jar  /opt/spark/jars


WORKDIR /opt/spark/work-dir
RUN wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
RUN tar -xzf kafka_2.12-2.2.1.tgz

RUN chmod +x /tmp/entrypoint.sh
ENTRYPOINT "/tmp/entrypoint.sh"

Step 3: custom metrics

For each application, there comes a time when it is required to get some specific metrics from it: the execution time of certain methods, key statistics about the internal state, checking the application’s health, etc. For this, it should be possible to instrument the application and transfer metrics to Prometheus. This can be done using the java library Dropwizard metrics… For example, a metric can be implemented as follows (source):

package org.apache.spark.metrics.source

import com.codahale.metrics._

object LegionMetrics {
  val metrics = new LegionMetrics
}

class LegionMetrics extends Source {
  override val sourceName: String = "LegionCommonSource"
  override val metricRegistry: MetricRegistry = new MetricRegistry
  val runTime: Histogram = metricRegistry.histogram(MetricRegistry.name("legionCommonRuntime"))
  val totalEvents: Counter = metricRegistry.counter(MetricRegistry.name("legionCommonTotalEventsCount"))
  val totalErrors: Counter = metricRegistry.counter(MetricRegistry.name("legionCommonTotalErrorsCount"))
}

You can now call this metric from anywhere in your code and update counters, histograms, gauges, or timers. All metrics are automatically made available via HTTP.

LegionMetrics.metrics.totalEvents.inc(batch.count())
LegionMetrics.metrics.runTime.update(System.currentTimeMillis - start)

After deploying a Spark application, you can navigate to the driver pod in the K8s cluster and ensure that all Spark and custom metrics are available. For example using curl.

Spark Streaming requires additional configuration: spark.sql.streaming.metricsEnabled

If for spark.sql.streaming.metricsEnabled set to value true, you will see additional metrics: latency, processing rate, state rows, event-time watermark, etc.

Step 4: Dashboards in Grafana

After deploying the Spark application and configuring Prometheus, you can move on to creating a dashboard in Grafana. Create a new dashboard, select the Prometheus datasource and enter your query.

Remember that PrometheusServlet follows the Spark 2.x naming conventions instead of Prometheus. Choose any metrics you like and place them on the dashboard.

You can also use Prometheus Alertmanager to identify alerts for important metrics. For example, it is recommended to make alerts for the following metrics: dropped jobs, long-running tasks, bulk shuffle, latency vs batch interval (streaming), etc.

Summary

If you are using Apache Spark to process huge amounts of data and are worried about cost efficiency and portability, then you are probably considering Kubernetes or are already using it. Apache Spark 3 takes another big step towards K8s. Monitoring and alerting Apache Spark in K8s is really easy with built-in Prometheus support, and the result is comparable to the functionality found in managed services such as AWS EMR.


Material prepared as part of the course Monitoring and Logging: Zabbix, Prometheus, ELK.

We invite everyone to a demo lesson “Logging systems (ELK, EFK, Graylog2)”… In the lesson, we will compare the various logging systems on the market: ELK, EFK – fluentd, Graylog2. registration here.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *