Optimizing Shuffle in Spark

Recently, we noticed that a significant part of our Spark applications’ runtime was spent exchanging data (Shuffle) between executors. In this article, I will tell you about the optimizations that helped us get rid of the heaviest Shuffle operations. It will not only be about BroadcastJoinbut also about two other non-obvious methods – preliminary repartition And bucketing.

What is Shuffle

Shuffle is an operation of redistributing data between partitions of a dataframe, which is required to perform wide transformations such as join, groupBy, distinct, dropDuplicates and window functions. In any Spark application, the Shuffle operation is almost inevitable. Despite this, Shuffle is a very time-consuming and resource-intensive operation.

Let's take a closer look at the stages that make up Shuffle:

  1. Calculating the hash of the transformation key: For each row of data, Spark computes a hash of the transformation key. For example, for the transformation groupBy("customer_id") Spark will calculate the hash of the column customer_id.

  2. Data compression: Before exchanging data between workers, Spark serializes and compresses it to reduce network and disk load.

  3. Data exchange: data is redistributed between executors so that all rows with the same hash are in the same partition on one executor. This process often requires writing all data to disk and then reading this data back in the correct order.

  4. Unpacking and transforming data: Once the exchange is complete, Spark unpacks the data and converts it into an RDD or DataFrame for further processing.

Shuffle places a heavy load on compute resources (serialization, compression, decompression, and deserialization of data), and also places a heavy load on the network and disk (during data exchange). Information about the amount of data transferred can be found in the Spark UI:

It often happens that Shuffle takes up the most time in application execution. In this article, we will discuss three methods of query transformation that will allow get rid of from some Shuffle operations:

  • BroadcastJoin: clue .hint("broadcast") removes Shuffle when joining a small dataframe.

  • Repartition: A .repartition() statement called at the right place can eliminate multiple Shuffles at once.

  • Bucketing: a way to store tables that avoids shuffle when reading them.

I will tell you more about each of them below.

BroadcastJoin

The Join operation is a broad transformation that requires redistribution of data between partitions to both dataframeswhich we can see in the execution plan:

Spark repartitions (Shuffles) both dataframes by join key to ensure that rows with the same hash are in the same partition, and then performs SortMergeJoin

Spark repartitions (Shuffles) both dataframes by join key to ensure that rows with the same hash are in the same partition, and then performs SortMergeJoin

When one of the dataframes is very small, Spark optimizes the execution plan and instead of a regular Join, it performs BroadcastJoin. In this case, Spark passes the smaller dataframe to all workers, which avoids shuffle for the other dataframe being joined. This optimization is controlled by the threshold spark.sql.autoBroadcastJoinThresholdwhich by default is 10 MB.

How does Spark estimate the size of a dataframe? Let's look at some examples:

# DataFrame[id: bigint]
# Точная оценка: 3000000 * 8 B
df = spark.range(3_000_000)            # 22.9 MB


# DataFrame[id: bigint, id: bigint]
# Оценка сверху: 24000000 * 24000000 B
df.join(df2, on=df.id==df.id)          # 523.9 TB

df.write.saveAsTable("saved_df")

# Таблица сохранена в Hive и хранится в сжатом формате parquet
# Spark получает размер от Hive
df = spark.read("saved_df")            # 11.5 MB

# Оценка сверху
df = df.filter(F.col("id") % 30 == 0)  # 11.5 MB

# Точная оценка: 3000000 / 30 * 8 B
df.cache().count()

df                                     # 781.3 KB

So, Spark knows exactly the size of the dataframe if:

  • A dataframe is the result of reading a table from Hive.

  • The dataframe is generated using, for example, spark.range().

  • Dataframe is cached.

In other cases, Spark gives a rough upper bound. Since Spark does not rebuild the execution plan on the fly, if we are sure that during the calculation some of the intermediate dataframes will be small enough for BroadcastJoin, we need to indicate this explicitly using the hint .hint("broadcast").

df_receipts = spark.table("receipts")
df_milk_products = spark.table("products").filter(
    col("category_name").isin(["Молоко"])
)

# Spark оценивает размер правого датафрейма более чем 10 MB
# Будет произведен shuffle обоих датафреймов (SortMergeJoin)
df_receipts.join(df_milk_products, on="product_id")

# Подсказываем Spark выполнить BroadcastJoin правого датафрейма,
# даже если он займет больше 10 MB памяти. Таким образом
# избегаем shuffle левого (очень большого!) датафрейма
df_only_milk_receipts = (
    df_receipts
    .join(df_milk_products.hint("broadcast"), on="product_id")
)
On the left is the computation graph for a regular Join, first a Shuffle of both tables, then a sort, and then a SortMergeJoin. On the right is the computation graph for a BroadcastJoin: the left (larger) table does not require a Shuffle, and instead of a SortMergeJoin, it now uses a BroadcastHashJoin.

On the left is the computation graph for a regular Join, first a Shuffle of both tables, then a sort, and then a SortMergeJoin. On the right is the computation graph for a BroadcastJoin: the left (larger) table does not require a Shuffle, and instead of a SortMergeJoin, it now uses a BroadcastHashJoin.

Using BroadcastJoin significantly reduces execution time, but you need to remember its features:

  • The data frame for a broadcast must be really small to fit into each performer's memory.

  • Even if the actual dataframe is very small, Spark may think very differently in cases where the dataframe is not materialized (i.e. not cached and not a table), so you need to explicitly indicate the use of BroadcastJoin using the construct .hint("broadcast").

  • BroadcastJoin is not applicable for Full Outer Join.

  • BroadcastJoin is not applicable for Left Join if the small dataframe is on the left, and for Right Join if the small dataframe is on the right.

Preliminary repartition

As mentioned above, the Shuffle operation is required not only for Join, but also for all other wide transformations. As an example, consider the following code with two consecutive GroupBy operations, as well as the query execution plan:

# Чеки в категории "Молоко"
df = df_only_milk_receipts

# Средний чек в категории "Молоко" в разрезе по неделям
stats = (
    df

    # Группировка №1
    .groupby("week", "receipt_id")
    .agg(sum("amount").alias("sum_amount"))

    # Группировка №2
    .groupby("week")
    .agg(mean("sum_amount").alias("avg_amount"))

    # Выполнение и получение статистики
    .collect()
)

Let's imagine that the dataframe df initially partitioned by field "week". This would mean that all receipts for a single week would also be in the same partition, and therefore all rows belonging to a single receipt would also be in the same partition. Common sense dictates that no data shuffling would be required in such a scenario.

Let's check if Spark can eliminate unnecessary Shuffles: add preliminary repartition of the dataframe df across the field "week" at the beginning of the query:

df = df_only_milk_receipts

stats = (
    df

    # Добавляем репартиртицирование по ключу, который является
    # подмножеством для обоих ключей дальнейших группировок
    .repartition("week")

    # Группировка №1
    .groupby("week", "receipt_id")
    .agg(sum("amount").alias("sum_amount"))

    # Группировка №2
    .groupby("week")
    .agg(mean("sum_amount").alias("avg_amount"))

    # Выполнение и получение статистики
    .collect()
)

Indeed, at the cost of adding one preliminary repartition, we were able to get rid of the two Shuffles preceding the two groupings.

The simple explanation for this is that for each operation, Spark compares two partitions:

  • Partitioning the input dataframe. In the example above df pre-partitioned by a set of fields {week}.

  • The partitioning required to perform the operation. In the example above, the aggregation requires a dataframe partitioned by a set of fields. {week, receipt_id}. If the input partition key is a subset of the required one, then Spark does not add the Shuffle operation. This is what happened in our example.

Sometimes you can find long sections of code that can be optimized by adding one line .repartition(...)For clarity, here is an example from a real project:

keys = ["store_id", "customer_id"]
window_1 = Window.partitionBy(*keys, "receipt_id")
window_2 = Window.partitionBy(*keys).orderBy("time")

result = (
    df

    # Первый и единственный shuffle в плане выполнения
    .repartition(*keys)

    # Благодаря BroadcastJoin не репартицируем датафрейм df
    .join(df_stores.hint("broadcast"), on="store_id")
    .join(df_products.hint("broadcast"), on="product_id")

    # Ключ партиции оконной функции включает в себя поля, по
    # которым партицирован датафрейм df
    .withColumn("quantity_sum",
        F.sum("quantity").over(window_1)
    )
    .withColumn("rto_sum",
        F.sum("price").over(window_1)
    )
    .filter(...)

    # Ключ партиции оконной функции включает в себя поля, по
    # которым партицирован датафрейм df
    .withColumn("rank",
        F.rank().over(window_2)
    )
    .filter(...)

    # Ключ группировки включает в себя поля, по которым
    # партицирован датафрейм df
    .groupby(*keys, "receipt_id")
    .agg( # ...
    )
    .groupby(*keys)
    .agg( # ...
    )

    # Ключ джойна совпадает с набором полей, по которому
    # партицирован датафрейм df. Для датафрейма big_df будет
    # добавлен shuffle по полям ["store_id", "customer_id"].
    .join(big_df, on=keys)
)

# Датафрейм result по-прежнему партицирован по полям
# ["store_id", "customer_id"]
result

In this example, the number of unique pairs is ["store_id", "customer_id"] is large enough, the groups themselves are small enough, which means you don't have to worry about what will happen after .repartition(*keys) the data will be heavily skewed.

It is worth remembering that a dataframe can be partitioned without a key, here are some examples:

  • df.repartition(200) will distribute the dataframe evenly into 200 partitions without a key.

  • Even if the table is stored in Hive in partitioned form, spark.table("table") will not inherit partitioning. More details about this are at the end of the article in the section about bucketing.

  • df.union(df) will multiply partitions and increase their number by two times, which means the rule “rows with the same hash are in the same partition” will be violated. In such a trivial case union can be rewritten to df.withColumn("n", explode(array(lit(1), lit(2)).drop("n")preserving the number of partitions and the partition key.

In addition, there are a couple of peculiarities associated with countDistinct And joindue to which preliminary partitioning will not work. The problems and their solutions are below.

Problem with two or more countDistinct() aggregations

Distributed countDistinct has a not-so-obvious calculation algorithm. You can read more about it, for example, Here. In this article, we will consider the case when two or more occur in one aggregation countDistinct:

(
    df
    .repartition("week") # Предварительное репартицирование
    .groupby("week", "receipt_id")
    .agg(
        countDistinct("product_id").alias("products"),
        countDistinct("brand_name").alias("brands")
    )
    .head()
)

Similar to previous cases, we expected just one Shuffle, but there were three. How did this happen?

Looking at the execution plan, we notice that:

  • A new operator Expand has appeared, which multiplies data. In our case, 2 times – by the number of functions countDistinct().

  • The partition key information of the dataframe was not preserved after applying the Expand operator (similar to union). And that means any subsequent broad transformation will inevitably require a new Shufflewhich is what we see in the implementation plan.

To avoid unnecessary Shuffles, you can use one of the hacks:

# 1. Замена countDistinct на collect_set + size
# Для очень больших датафреймов может вызвать ошибку
# java.lang.IllegalArgumentException: Cannot grow BufferHolder by size XXXX
# because the size after growing exceeds size limitation 2147483632
(
    df
    .repartition("week")
    .groupby("week", "receipt_id")
    .agg(
        size(collect_set("product_id")).alias("products"),
        size(collect_set("brand_name")).alias("brands")
    )
    .head()
)


# 2. С помощью оконных функций
# Требует две разных сортировки, что негативно сказывается
# на времени выполнения
from pyspark.sql import Window
window = Window.partitionBy("week", "receipt_id")
(
    df
    .repartition("week")
    .withColumn("product_id_dense_rank",
                dense_rank().over(window.orderBy("product_id")))
    .withColumn("brand_name_dense_rank",
                dense_rank().over(window.orderBy("brand_name")))
    .groupby("week", "receipt_id")
    .agg(
        max("product_id_dense_rank").alias("products"),
        max("brand_name_dense_rank").alias("brands")
    )
    .head()
)
On the left is the execution plan for replacing countDistinct with size + collect_set, on the right is the execution plan for the dense_rank + max window function.

On the left is the implementation plan for replacement countDistinct on size + collect_seton the right – for the window function dense_rank + max.

Problem with Join key

We found that if the partition key is a subset of the group key, then GroupBy does not require an additional Shuffle. We would expect this behavior from Join, but for some reason it is not: Join requires that the two keys match exactly. And this makes life very difficult for us, for example:

left = spark.table("left").repartition("key")
right = spark.table("right").repartition("key")

# 1. Ключ джойна в точности совпадает с ключом партицирования
# обоих датафреймов. Дополнительный shuffle не требуется.
joined = (
    left.join(right, on="key")
    .head()
)

# 2. Ключ джойна является надмножеством ключа партицирования,
# но Spark все равно вставляет дополнительный shuffle
joined = (
    left.join(right, on=["key", "key_2"])
    .head()
)

The execution plan for the second query is as follows:

Two Shuffles in a row is definitely not what we want

Two Shuffles in a row is definitely not what we want

There is a well-known hack for Inner Join: move part of the join key to .filterFor Outer Join there are no easy ways to avoid Shuffle.

left = spark.table("left").repartition("key")
right = spark.table("right").repartition("key")

(
    left
    .join(right, on="key")
    .filter(
	  # Условие на равенство (left.key_2 == right.key_2) будет проброшено
	  # оптимизатором в ключи джойна, поэтому Spark нужно обмануть:
        (left.key_2 <= right.key_2)
        & (left.key_2 >= right.key_2)
    )
    .head()
)
Thanks to the Spark optimizer's PushPredicateThroughJoin rule, the condition from the filter will be applied directly during the merging of rows in the SortMergeJoin

Thanks to the rule PushPredicateThroughJoin Spark optimizer, the condition from the filter will be applied directly during the merging of rows in SortMergeJoin

Table bucketing

In the examples above, you can see that the Shuffle operation follows immediately after each table read. Let's say we know that the same transformation (e.g. GroupBy) will always be applied to the data in the table. Can we organize the table storage in a partitioned form so that the partitioning is preserved when reading it? This would eliminate one useless Shuffle.

Spark can actually write partitioned tables:

# По умолчанию используется формат файлов parquet
df.write.partitionBy("store_id").saveAsTable("datamart.receipts")

With this recording method, files will be divided into subdirectories in the following form:

/user/hive/warehouse/datamart.db/receipts/
|-- store_id=1
|   `-- part-aaaaa-...-aaaaaaaaaaaa.c000.snappy.parquet
|-- store_id=2
|   `-- part-bbbbb-...-bbbbbbbbbbbb.c000.snappy.parquet
`-- store_id=3
    `-- part-ccccc-...-cccccccccccc.c000.snappy.parquet

It seems logical that Spark should inherit table partitioning during reading, but it does not, and for this purpose Spark provides another way of writing tables, which is called bucketing. To do this, when saving a table in Hive, you must specify the instruction .bucketBy:

N = df.rdd.getNumPartitions()
numBuckets = 200
df.write.bucketBy(numBuckets, "store_id").saveAsTable("datamart.receipts")

With this recording method, the table will be divided into N ⨉ numBuckets files, where N is the number of partitions in the dataframe. df:

/user/hive/warehouse/datamart.db/receipts/
|-- part-11111-...-111111111111_00000.c000.snappy.parquet
|-- part-11111-...-111111111111_00001.c000.snappy.parquet
|-- ...
|-- part-11111-...-111111111111_00199.c000.snappy.parquet
|-- part-22222-...-222222222222_00000.c000.snappy.parquet
|-- ...
`-- part-NNNNN-...-NNNNNNNNNNNN_00199.c000.snappy.parquet

When reading such a table, Spark will form a dataframe with exactly 200 partitions and will know that the dataframe is partitioned by field "store_id". With some conventions, we can say that the following two examples will produce identical dataframes:

# 1. Бакетирование
df.write.bucketBy(200, "store_id").saveAsTable("datamart.receipts")
df = spark.table("datamart.receipts")

# 2. Репартиционирование
df = df.repartition(200, "store_id")

And now, applying .groupBywe don't see the Shuffle that precedes it:

# Таблица datamart.receipts бакетирована на 200 бакетов
# по полю "store_id", поэтому датафрейм df имеет 200 партиций
# с партицированием по полю "store_id"
df = spark.table("datamart.receipts")

# План выполнения следующего запроса не будет содержать
# ни одной операции shuffle
stats = (
    df
    .groupby("store_id", "receipt_id")
    .agg(sum("amount").alias("sum_amount"))
    .groupby("store_id")
    .agg(mean("sum_amount").alias("avg_amount"))
    .collect()
)
Thanks to bucketing, there is not a single Shuffle left

Thanks to bucketing, there is not a single Shuffle left

Bucketing has its drawbacks:

  • The number of buckets must be specified (argument numBuckets). If the number of buckets is less than the number of workers, some workers will not receive data at all and will be idle.

  • In the worst case, bucketing a table results in the creation of N ⨉ numBuckets files, where N is the number of partitions in the dataframe df. This can be avoided by repartitioning the dataframe by the same columns before writing: df.repartition(200, *keys).write.bucketBy(200, *keys).saveAsTable(...).

Conclusion

Shuffle is an essential part of any Spark application, but it is time-consuming and places a lot of strain on the network. It is important to minimize the number of Shuffles to reduce the execution time of tasks.

In this article, we covered the following query transformation methods:

  • BroadcastJoin: clue .hint("broadcast") tells Spark that a small dataframe can be sent to all workers. This avoids Shuffle in Join operations.

  • Preliminary repartition of dataframes: will eliminate unnecessary Shuffle for successive transformations with the same key. To do this, you need to add a call .repartitionPlease note that such repartitioning may result in data skew.

  • Table bucketing: Allows you to organize your data in a way that avoids shuffle after reading it. Bucketing is especially useful in scenarios where the same transformations are always applied to the data in a table.

I also want to thank Ilya Tkachev, Ilia Chernikov And Andrey Mazur for their support and contribution to the creation of this article.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *