Chaining custom DataFrame transformations in Spark

The translation of the material was prepared as part of the recruitment of students for an online course “Ecosystem Hadoop, Spark, Hive”

We invite everyone to an open webinar “Testing Spark Applications”… In this open lesson, we will consider the problems in testing Spark applications: stat data, partial verification, and starting / stopping heavy systems. Let’s study the libraries for the solution and write tests. Join us!


For a chain of transformations DataFrame in Spark you can use implicit classes or method Dataset#transform… This blog post will demonstrate how to build transformation chains. DataFrame, and explained why the method Dataset#transform preferable to implicit classes

Structuring Spark Code as Transforms DataFrame distinguishes powerful Spark programmers from “spaghetti hackers”, as detailed in the article “Writing Beautiful Spark Code”… Once posted on the blog, your Spark code will be much easier to test and reuse.

If you are using PySpark, see this article on PySpark DataFrame Custom Transform Chaining

Dataset transform method

Method transform (transform) the dataset provides “short syntax for chaining custom transformations”

Suppose we have a method withGreeting()which adds a welcome column to DataFrame, and the method withFarewell()which adds a goodbye column to DataFrame

def withGreeting(df: DataFrame): DataFrame = {
  df.withColumn("greeting", lit("hello world"))
}

def withFarewell(df: DataFrame): DataFrame = {
  df.withColumn("farewell", lit("goodbye"))
}

We can use the method transform (transforms) to run methods withGreeting() and withFarewell()

val df = Seq(
  "funny",
  "person"
).toDF("something")

val weirdDf = df
  .transform(withGreeting)
  .transform(withFarewell)
weirdDf.show()

+---------+-----------+--------+
|something|   greeting|farewell|
+---------+-----------+--------+
|    funny|hello world| goodbye|
|   person|hello world| goodbye|
+---------+-----------+--------+

Method transform (transforms) can be easily combined with built-in Spark methods DataFrame, such as select

df
  .select("something")
  .transform(withGreeting)
  .transform(withFarewell)

If the method transform (transforms) is not used, then we will have to nest method calls, and the code will become less readable.

withFarewell(withGreeting(df))

// even worse
withFarewell(withGreeting(df)).select("something")

Transform method with arguments

Custom conversions DataFrameusing arguments can also use the method transform (transforms) using currying / multiparameter lists in Scala.

Let’s use the same method withGreeting()as before, and add the method withCat()which takes a string as an argument.

def withGreeting(df: DataFrame): DataFrame = {
  df.withColumn("greeting", lit("hello world"))
}

def withCat(name: String)(df: DataFrame): DataFrame = {
  df.withColumn("cats", lit(s"$name meow"))
}

We can use the method transform (transforms) to run methods withGreeting() and withCat()

val df = Seq(
  "funny",
  "person"
).toDF("something")

val niceDf = df
  .transform(withGreeting)
  .transform(withCat("puffy"))
niceDf.show()

+---------+-----------+----------+
|something|   greeting|      cats|
+---------+-----------+----------+
|    funny|hello world|puffy meow|
|   person|hello world|puffy meow|
+---------+-----------+----------+

Method transform (transforms) can be used for custom transforms DataFramewhich can also use arguments!

Mankipatching with Implicit Classes

Implicit classes can be used to add methods to existing classes. The following code adds the same methods withGreeting() and withFarewell() to the class itself DataFrame

object BadImplicit {

  implicit class DataFrameTransforms(df: DataFrame) {

    def withGreeting(): DataFrame = {
      df.withColumn("greeting", lit("hello world"))
    }

    def withFarewell(): DataFrame = {
      df.withColumn("farewell", lit("goodbye"))
    }

  }

}

Methods withGreeting() and withFarewell() can be chained and executed as follows.

import BadImplicit._

val df = Seq(
  "funny",
  "person"
).toDF("something")

val hiDf = df.withGreeting().withFarewell()

Extending core classes works, but this is bad programming practice and should be avoided.

Avoiding implicit classes

Changing base classes is known as mankipatching and is a delightful feature of Ruby, but can be risky in inexperienced hands.
– Sandy Metz

Sandy’s comment was directed at the Ruby programming language, but the same principle applies to the implicit Scala classes.

Mankipatching usually not welcome in the Ruby community, and should be avoided in Scala.

Spark was kind enough to provide a method transform (transformations) and you don’t need mankipatching for the class DataFrame… With some programming tricks in Scala, we can even force the method transform work with custom transforms that can take arguments. This is done by the method transform the clear winner!


More about the course: “Ecosystem Hadoop, Spark, Hive”

Watch a demo lesson: “Testing Spark Applications”

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *