Spark Performance Secrets, or Why Query Compilation Matters

For future students of courses “Data Engineer” and “Ecosystem Hadoop, Spark, Hive” prepared another translation of a useful article.


Criteo is a data-driven company. Every day, tens of terabytes of new data are passed through our systems to train recommendation models that process requests across the Internet. Spark is our primary tool for processing big data. It is a powerful and flexible tool, but it is quite difficult to learn, and to use it effectively, you often need to read the source code of the platform.

Fast processing of big data is critical to our business:

  • we frequently update our models to improve their performance for our clients;

  • We bring new machine learning products to market quickly by being able to iterate quickly.

  • the cost of data processing depends on the infrastructure

In this article, I’ll walk you through writing efficient Spark code and demonstrate some common pitfalls with examples. I will show that in most cases Spark SQL (Datasets) should be preferred over Spark Core API (RDD), and if you make the right choice, you can improve the performance of big data processing by 2-10 times, which is very significant.

Experimental configuration

Spark 2.4.6, Macbook Pro 2017 with Intel Core i7 3.5GHz

Measurements are always taken on a hot Java virtual machine (100 code runs are performed and the average is taken over the last 90 runs). The code in this article is written in Scala, but its conclusions should be true for Python as well.

Big data processing misconceptions

It is widely believed that there are two main performance bottlenecks in big data processing:

  • data shuffling because it requires sending data over the network to perform it;

  • disk I / O because accessing data on disk is always much slower than accessing data in RAM.

These notions have a historical basis – in 2006, when the Hadoop library first appeared, conventional hard drives were slow and unreliable, and MapReduce was the main platform for processing big data. It was the sluggishness of hard drives that spurred the development of in-memory processing tools like Spark. Since then, hardware performance has improved significantly.

In 2015 in study by Kay Ousterhout et al.¹ We analyzed bottlenecks in Spark jobs and found that the speed of their execution is largely determined by the operations that load the CPU, rather than I / O and data transfer over the network. In particular, the authors of this scientific work performed a wide range of queries for three test datasets, including TPC-DS², and it was determined that:

  • if the network bandwidth was unlimited, the task execution time could be reduced by 2% (median value);

  • if disk I / O throughput were limitless, the lead time for standard analysis could be reduced by 19% (median).

A very unexpected result! It turns out that disk I / O has a much larger impact on performance than transferring data over the network. There are several reasons for this:

  • Spark uses disk I / O not only when reading the input dataset and writing the result, but also during the execution of jobs to cache and move data to disk that does not fit in RAM.

  • Analytical tasks often require aggregation, so the amount of data transferred over the network is usually less than the amount of data that is initially read from disk.

Interestingly, Databricks specialists around 2016³ came to the same conclusions, which led them to reorient Spark’s development vector to optimize CPU usage. The result was the introduction of SQL support as well as the DataFrames API and later Datasets.

How fast is Spark?

Let’s look at a simple problem – let’s use a naive method to calculate even numbers from 0 to 10⁹. To complete such a task, Spark, in principle, is not required, so first we will write a simple program in Scala:

var res: Long = 0L
var i: Long  = 0L
while (i < 1000L * 1000 * 1000) {
  if (i % 2 == 0) res += 1
  i += 1L
}

Listing 1. Naive counting

Now let’s also compute this same result using Spark RDD and Spark Datasets. To keep the experiment honest, I run Spark in local[1] mode:

val res = spark.sparkContext
  .range(0L, 1000L * 1000 * 1000)
  .filter(_ % 2 == 0)
  .count()

Listing 2. Counting with RDD

val res = spark.range(1000L * 1000 * 1000)
  .filter(col("id") % 2 === 0)
  .select(count(col("id")))
  .first().getAs[Long](0)

Listing 3. Counting with Datasets

The execution times for all code snippets are shown below. Unsurprisingly, handwritten code is the most efficient solution. Surprisingly, RDD is five times slower, while Datasets have nearly the same computation time as hand-written code.

The Datasets paradox

Paradox: The Datasets API is built on top of RDD, but is much faster, almost as fast as handwritten code for a specific task. How is this even possible? It’s about a new execution model.

The past – the Volcano model

Code written using RDD is executed using the Volcano runtime model. In practice, this means that each RDD follows a standard interface:

  • knows its parent RDD;

  • provides through the method compute access to the Iterator[T]that iterates over the elements of the given RDD (it is private and should only be used by Spark developers).

abstract class RDD[T: ClassTag]
def compute(…): Iterator[T]

Listing 4. RDD.scala

With these properties in mind, a simplified version of the RDD counting function implementation that ignores parsing looks like this:

def pseudo_rdd_count(rdd: RDD[T]): Long = {
  val iter = rdd.compute
  var result = 0
  while (iter.hasNext) result += 1
  result
}

Listing 5. Pseudocode for an RDD-based counting action

Why is this code significantly slower than the handwritten code in Listing 1? There are several reasons:

  • Virtual function calls to iterators: Calls to Iterator.next () incur additional overhead compared to non-virtual functions that can be executed by the compiler or JIT as inline.

  • Lack of optimization at the CPU level: Java Virtual Machine and JIT cannot optimize the bytecode generated by Listing 5 as well as the bytecode generated by Listing 1. In particular, handwritten code allows the Java Virtual Machine to JIT store intermediate computation results in a CPU register, rather than put them in main memory.

Present – the formation of the code of the entire stage

Code written with Spark SQL⁵ is executed differently from code written using RDD. When the action is triggered, Spark generates code that folds multiple data transformations into a single function. This process is called generating the code for the entire stage (Whole-Stage Code Generation) ⁶. Spark tries to emulate the process of writing custom code for a specific task that does not use virtual function calls. Such code can be executed by the JVM / JIT more efficiently. Actually Spark generates quite a lot of code, see for example Spark code for Listing 3

Technically Spark only generates high-level code, while bytecode generation is done by the compiler. Janino⁴. This is what makes Spark SQL so fast compared to RDD.

Using Spark effectively

Spark has 3 Scala / Java APIs today: RDD, Datasets and DataFrames (which is now bundled with Datasets). RDD is still widely used in Spark – in part, because the API is used by most of the previously created jobs, and the prospect of “keeping up” is tempting. However, as tests show, switching to the Datasets API can provide huge performance gains through optimized CPU usage.

The wrong approach – the classic way

The most common problem I’ve encountered when using Spark SQL is explicitly switching to the RDD API. This is because it is often easier for a programmer to formulate a computation in terms of Java objects than using the limited Spark SQL language:

val res = spark.range(1000L * 1000 * 1000)
    .rdd
    .filter(_ %2 == 0)
    .count()

Listing 6. Switching from Dataset to RDD

This code runs in 43 seconds instead of the original 2.1 seconds, while doing exactly the same thing. Explicitly switching to RDD stops the entire step and starts the conversion of dataset elements from primitive types to Java objects, which turns out to be very expensive. If we compare the flow diagrams of the code in Listings 3 and 6 (see below), we can see that in the second case an additional stage appears.

Figure 1. Visual representations of the steps for Listing 3 (diagram a) and Listing 6 (diagram b)
Figure 1. Visual representations of the steps for Listing 3 (diagram a) and Listing 6 (diagram b)

The wrong approach is the sophisticated way

Spark SQL’s performance is surprisingly fragile. This minor change results in a threefold increase in query execution time (up to 6 seconds):

val res = spark
  .range(1000L * 1000 * 1000)
  .filter(x => x % 2 == 0) // note that the condition changed
  .select(count(col("id"))) 
  .first()
  .getAs[Long](0)

Listing 7. Replacing a Spark SQL Expression with a Scala Function

Spark is unable to generate efficient code for a filter condition. The condition is a Scala anonymous function, not a Spark SQL expression, and Spark will deserialize each record from the optimized internal representation to call this function. What’s remarkable is that this change does not affect the visual representation of the stages (Figure 1a), so it cannot be detected by analyzing the directed acyclic graph (DAG) of the job in the Spark user interface.

High performance of Spark SQL is provided by limiting the range of available operations – you still have to sacrifice something! To get the best performance, you need to use transformations that work with columns: use filter (condition: Column) instead of filter (T => Boolean) and select (…) instead of map (…). This does not require Spark to rebuild the object represented by a single row of the dataset (Dataset). And of course, avoid switching to RDD.

Conclusion and concluding remarks

The simple examples provided in this article demonstrate that most of the time spent on big data jobs is not wasted on useful work. A good solution to this problem is query compilation, which is possible using Spark SQL and makes better use of modern hardware. Recent research suggests that leveraging efficient queries for standard big data processing is more important than optimizing network and disk I / O utilization.

Correct application of query compilation can reduce processing time by 2-10 times, which means faster experimentation, lower infrastructure costs, and tremendous pleasure in doing your job elegantly!

Code samples from this article can be found here… Using this repository, you can analyze the performance of different Spark queries.

Used materials

  1. Ousterhout, Kay, et al. Making sense of performance in data analytics frameworks. 12th {USENIX} Symposium on Networked Systems Design and Implementation ({NSDI} 15)… 2015.

  2. http://www.tpc.org/tpcds/

  3. https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

4. https://janino-compiler.github.io/janino/

five. http://people.csail.mit.edu/matei/papers/2015/sigmodsparksql.pdf

6. https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html


Learn more about courses “Data Engineer” and “Ecosystem Hadoop, Spark, Hive”

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *