“School” course on Apache Spark: optimization
Apache Spark Guide is not for beginners.
In the previous article SPARK for “little ones” I wrote about Apache Spark's capabilities and features for data processing. We focused on the key functions of reading, processing, and storing data, not forgetting about code examples that will help beginners quickly get started.
In this article, we'll go deeper and look at optimization. We'll focus on basic concepts, query optimization, and joins. With examples, of course.
[Оглавление]
#1. Basic Optimization Concepts
— Data schema
— Caching and Persistence
— Partition management
Choosing the right operations
— Transformations
— Actions
— The principle of lazy evaluation
— How to choose the right operations
— Using map and flatMap
— Using reduceByKey and groupByKey
The order of operations
— The Importance of Performing Filtering and Aggregation Before Joins
— Strategies for building queries from smaller tables to larger ones
— Different strategies for reducing data before joining
Why optimize Spark?
There are several good reasons for this.
Reduced task completion time.
Optimization can significantly reduce the time required to perform various computational operations. This is especially important for processing large amounts of data, where even small improvements can lead to significant reductions in execution time.
Improving the efficiency of resource use.
Optimization helps to use computing resources such as CPU time and RAM more efficiently. This allows you to process more data with less effort and prevents system overload.
Improving application performance.
Optimized Spark applications run faster and with lower latency. This is especially important for real-time applications where data processing speed is critical.
Cost reduction.
Efficient use of resources also results in lower infrastructure costs, as less computing power and memory are required to process the same amounts of data.
Stability and reliability.
Optimization helps avoid performance issues such as freezes and crashes, making systems more stable and reliable in operation.
Scalability.
Optimized solutions are easier to scale because they use resources more efficiently and can handle increasing amounts of data without significantly increasing execution time.
Thus, Apache Spark optimization not only speeds up task execution and improves overall system performance, but also helps reduce operating costs, increase reliability and scalability of solutions.
Let's move on to concepts.
#1. Basic Optimization Concepts
This chapter includes: data schema, caching and persistence, partition management. Let's start with the data schema.
Data schema
Optimizing Apache Spark starts with effective data schema management. This is one of the key concepts that significantly impacts the performance of your applications.
Using schema instead of automatic type detection
By default, Spark can automatically detect data types when reading files such as CSV, JSON, and Parquet. This is convenient for rapid development. However, automatic data type detection (schema inference) can negatively impact performance.
That's why:
Time of processingAutomatic type detection requires Spark to read the file and analyze its contents for each column to determine the appropriate data types. This adds additional steps to the data reading process, which increases the overall execution time of the task.
Type definition errorsAutomatic type detection is not always correct and may result in incorrect data type detection for some columns. This may cause errors in data processing later on.
Specifying the data schema explicitly has several significant advantages.
Speed up data reading. When the data schema is explicitly specified, Spark does not waste time analyzing the data structure. This reduces the time it takes to read data, especially when working with large files.
Optimizing the execution plan. An explicit schema allows Spark to better optimize the execution plan, since the exact data types and table structure are known. This improves task allocation and resource utilization.
Preventing errors. Having an explicit schema reduces the risk of errors due to incorrect data type definitions, making data processing more reliable.
Compatibility with other toolsAn explicitly defined schema can be easily integrated with other tools and systems, improving compatibility and simplifying data exchange.
Let's look at an example of reading a CSV file with an explicit data schema definition.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Создание SparkSession
spark = SparkSession.builder.appName("SchemaExample").getOrCreate()
# Определение схемы данных
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True)
])
# Чтение CSV-файла с явно заданной схемой
df = spark.read.schema(schema).csv("path/to/file.csv")
# Показ первых 5 строк
df.show(5)
In this example:
we create a SparkSession to work with data;
we define the data schema using
StructType
AndStructField
;read a CSV file using an explicitly defined schema;
display the first 5 lines for verification.
Caching and Persistence
Caching and data persistence in Spark are important performance enhancements. They allow you to store the results of intermediate calculations in memory or on disk so that the calculations do not have to be repeated.
When to use cache()
and when persist()
And How?
Usage cache():
When the same set of data needs to be reused in several subsequent actions.
Suitable for frequently accessed data, as caching stores data in memory (RAM), which speeds up access.
Usage persist()
:
When it is necessary to control the level of data storage (e.g. in memory, on disk, or a combination).
Useful for large data sets that do not fit entirely in memory, or for fault tolerance.
Difference between cache()
And persist()
:
cache()
: equivalent to callingpersist(StorageLevel.MEMORY_ONLY)
stores data only in memory.persist()
: Allows you to select different Storage Levels depending on your needs.
Storage levels:
MEMORY_ONLY
. Stores data only in memory. Fast, but requires a lot of memory.MEMORY_AND_DISK
. Stores data in memory, and if there is not enough memory, saves it to disk.DISK_ONLY.
Stores data only on disk. Slower, but saves memory.MEMORY_ONLY_SER
. Stores data in memory in serialized form. Saves memory, but increases serialization/deserialization costs.MEMORY_AND_DISK_SER
. CombinationMEMORY_ONLY_SER
AndDISK_ONLY
.
Example using
cache()
.
from pyspark.sql import SparkSession
# Создание SparkSession
spark = SparkSession.builder.appName("CacheExample").getOrCreate()
# Чтение данных из CSV-файла
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
# Кэширование DataFrame
df.cache()
# Первое действие: подсчет строк
count = df.count()
print(f"Total count: {count}")
# Второе действие: фильтрация данных
filtered_df = df.filter(df['age'] > 30)
filtered_count = filtered_df.count()
print(f"Filtered count: {filtered_count}")
# Отключение кэширования
df.unpersist()
# Остановка SparkSession
spark.stop()
Example using
persist()
.
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
# Создание SparkSession
spark = SparkSession.builder.appName("PersistExample").getOrCreate()
# Чтение данных из CSV-файла
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
# Персистенция DataFrame с использованием уровня MEMORY_AND_DISK
df.persist(StorageLevel.MEMORY_AND_DISK)
# Первое действие: подсчет строк
count = df.count()
print(f"Total count: {count}")
# Второе действие: фильтрация данных
filtered_df = df.filter(df['age'] > 30)
filtered_count = filtered_df.count()
print(f"Filtered count: {filtered_count}")
# Отключение персистенции
df.unpersist()
# Остановка SparkSession
spark.stop()
Examples showing the difference in performance.
from pyspark.sql import SparkSession
# Создание SparkSession
spark = SparkSession.builder.appName("NoCacheExample").getOrCreate()
# Чтение данных из CSV-файла
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
# Первое действие: подсчет строк (данные читаются заново)
count = df.count()
print(f"Total count: {count}")
# Второе действие: фильтрация данных и подсчет строк (данные читаются заново)
filtered_df = df.filter(df['age'] > 30)
filtered_count = filtered_df.count()
print(f"Filtered count: {filtered_count}")
# Остановка SparkSession
spark.stop()
With caching.
from pyspark.sql import SparkSession
# Создание SparkSession
spark = SparkSession.builder.appName("CacheExample").getOrCreate()
# Чтение данных из CSV-файла
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
# Кэширование DataFrame
df.cache()
# Первое действие: подсчет строк (данные читаются и кэшируются)
count = df.count()
print(f"Total count: {count}")
# Второе действие: фильтрация данных и подсчет строк (данные берутся из кэша)
filtered_df = df.filter(df['age'] > 30)
filtered_count = filtered_df.count()
print(f"Filtered count: {filtered_count}")
# Отключение кэширования
df.unpersist()
# Остановка SparkSession
spark.stop()
Difference in performance.
No caching/persistence. For each action (e.g. count or filter), Spark re-reads the data from the source file, which incurs a significant time overhead.
With caching/persistence. Data is read from the source file only once and stored in memory or on disk. All subsequent actions are performed faster, since the data is taken from the cache, bypassing the repeated reading of the source data.
Partition management
Managing data partitions in Apache Spark is an important part of performance optimization. Properly partitioning data allows for more efficient use of resources and faster task execution.
Splitting data into partitions
When data is loaded into Spark, it is automatically partitioned.
A partition is a logical unit of data that is processed by a single node in a Spark cluster. The default number of partitions depends on the data source and Spark configuration.
How to set up the number of partitions correctly?
Setting the number of partitions depends on the volume of data and available resources in the cluster. Basic recommendations include:
Balance between partitions. Partitions should be approximately the same size to distribute the load evenly between nodes.
Number of partitions. A general rule of thumb is to have about 2-4 partitions per CPU in a cluster. For example, if you have a cluster with 8 nodes, each with 4 CPUs, the total number of partitions could be anywhere from 64 to 128.
Partition size. The optimal size is usually between 128 MB and 1 GB.
Using repartition and coalesce
Spark provides repartition and coalesce methods to change the number of partitions.
Repartition method is used to increase or decrease the number of partitions. It performs a shuffle of data, which can be an expensive operation, but ensures that the data is evenly distributed across partitions.
Coalesce method is used to reduce the number of partitions. It does this without shuffling data, making it less expensive than repartition. However, coalesce is only effective for reducing the number of partitions.
An example of splitting data into partitions during loading.
from pyspark.sql import SparkSession
# Создание SparkSession
spark = SparkSession.builder.appName("PartitioningExample").getOrCreate()
# Чтение данных из CSV-файла (по умолчанию разбивается на партиции)
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
# Проверка количества партиций
print(f"Number of partitions: {df.rdd.getNumPartitions()}")
An example of setting the number of partitions when loading data.
# Загрузка данных с указанием количества партиций
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True).repartition(100)
# Проверка количества партиций
print(f"Number of partitions after repartition: {df.rdd.getNumPartitions()}")
repartition.
# Увеличение количества партиций до 100
df_repartitioned = df.repartition(100)
# Проверка количества партиций после repartition
print(f"Number of partitions after repartition: {df_repartitioned.rdd.getNumPartitions()}")
coalesce.
# Уменьшение количества партиций до 10
df_coalesced = df.coalesce(10)
# Проверка количества партиций после coalesce
print(f"Number of partitions after coalesce: {df_coalesced.rdd.getNumPartitions()}")
An example of using repartition to increase the number of partitions.
from pyspark.sql import SparkSession
# Создание SparkSession
spark = SparkSession.builder.appName("RepartitionExample").getOrCreate()
# Чтение данных из CSV-файла
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
# Первоначальное количество партиций
print(f"Initial number of partitions: {df.rdd.getNumPartitions()}")
# Увеличение количества партиций до 100
df_repartitioned = df.repartition(100)
# Проверка количества партиций после repartition
print(f"Number of partitions after repartition: {df_repartitioned.rdd.getNumPartitions()}")
# Остановка SparkSession
spark.stop()
An example of using coalesce to reduce the number of partitions.
from pyspark.sql import SparkSession
# Создание SparkSession
spark = SparkSession.builder.appName("CoalesceExample").getOrCreate()
# Чтение данных из CSV-файла
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
# Первоначальное количество партиций
print(f"Initial number of partitions: {df.rdd.getNumPartitions()}")
# Уменьшение количества партиций до 10
df_coalesced = df.coalesce(10)
# Проверка количества партиций после coalesce
print(f"Number of partitions after coalesce: {df_coalesced.rdd.getNumPartitions()}")
# Остановка SparkSession
spark.stop()
№2. Query optimization
Optimizing queries in Apache Spark involves choosing the right operations and understanding principles such as lazy evaluation. Let's look at the difference between transformations (e.g. map, filter) and actions (e.g. count, collect), and how to choose the right operations to improve performance.
Choosing the right operations
In Spark, all operations can be divided into two categories: transformations and actions.
Transformations
These are operations that create a new distributed data set (RDD) from an existing one, but do not perform any computations immediately. Transformations are lazy, meaning they are not executed until the action is called.
Example of map transformation.
rdd = sc.parallelize([1, 2, 3, 4, 5])
squared_rdd = rdd.map(lambda x: x * x)
filter, flatMap, reduceByKey, groupByKey are all also transformations.
Actions
These are operations that initiate calculations and return a result. Actions require all preceding transformations to be performed.
Examples of actions are count, collect, take, saveAsTextFile.
Count action example.
count = squared_rdd.count()
print(f"Number of elements: {count}")
The principle of lazy evaluation
Lazy evaluation is a key principle of Spark, which defers the execution of transformations until an action is called. This allows Spark to optimize the execution plan by combining transformations and minimizing the number of passes over the data.
Example.
# Создание SparkSession
spark = SparkSession.builder.appName("LazyEvaluationExample").getOrCreate()
# Чтение данных из CSV-файла
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
# Трансформации (ленивые вычисления)
filtered_df = df.filter(df['age'] > 30)
selected_df = filtered_df.select("name", "age")
# Действие (вызывает выполнение всех предыдущих трансформаций)
result = selected_df.collect()
# Печать результатов
for row in result:
print(row)
# Остановка SparkSession
spark.stop()
How to choose the right operations?
Use transformations instead of actions.
If possible, avoid calling actions that collect data on the driver (eg. collect
) on large data sets, as this can lead to memory overhead. Instead, use transformations to minimize the amount of data before calling the action.
Bad example: collect on big data.
large_rdd = sc.parallelize(range(1000000))
collected_data = large_rdd.collect() # Может привести к переполнению памяти
A good example: reducing data before collect.
filtered_rdd = large_rdd.filter(lambda x: x % 2 == 0)
small_collected_data = filtered_rdd.take(10) # Безопаснее, так как собирается небольшой объём данных.
Using map and flatMap
Use map and flatMap to transform data:
map is applied to each element of the RDD and returns a new RDD of the same size;
flatMap can return RDDs of varying sizes.
Example of using map.
rdd = sc.parallelize(["apple", "banana", "cherry"])
length_rdd = rdd.map(lambda x: len(x))
print(length_rdd.collect())
Example of using flatMap.
words_rdd = rdd.flatMap(lambda x: x.split("a"))
print(words_rdd.collect())
Using reduceByKey and groupByKey
For data aggregation, use reduceByKey instead of groupByKey, since reduceByKey aggregates data locally on each node before sending it over the network, which reduces the amount of data transferred.
Example of using reduceByKey.
pairs = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
reduced_pairs = pairs.reduceByKey(lambda x, y: x + y)
print(reduced_pairs.collect())
Example of using groupByKey (less efficient).
grouped_pairs = pairs.groupByKey()
print([(x, list(y)) for x, y in grouped_pairs.collect()])
Example of query optimization.
Let's look at an example where we want to calculate the average age of users over 30 years old from a CSV file.
# Создание SparkSession
spark = SparkSession.builder.appName("QueryOptimizationExample").getOrCreate()
# Чтение данных из CSV-файла
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
# Оптимизация запроса
# Трансформации
filtered_df = df.filter(df['age'] > 30)
age_sum = filtered_df.groupBy().sum("age")
age_count = filtered_df.groupBy().count()
# Действие: выполнение всех трансформаций и получение результата
sum_age = age_sum.collect()[0][0]
count_age = age_count.collect()[0][0]
average_age = sum_age / count_age
print(f"Average age: {average_age}")
# Остановка SparkSession
spark.stop()
In this example, we minimize the data volume by applying filtering before aggregation and avoiding redundant actions. All transformations are combined and executed together when the action is called, allowing Spark to optimize query execution.
The order of operations
The correct order of operations in Spark has a significant impact on the performance of applications. In particular, performing filtering and aggregation before join operations and strategically building queries from smaller tables to larger ones can significantly improve the efficiency of data processing.
Why is it important to perform filtering and aggregation before joins?
Reducing the volume of data: Filtering and aggregation reduce the amount of data that must be processed during a subsequent connection. This reduces the cost of transmitting data over the network and reduces the amount of data involved in the connection operation.
Resource reduction: Smaller amounts of data require fewer resources to store and process, allowing for more efficient use of computing resources.
Reduced execution time: Process smaller amounts of data faster and more efficiently, reducing overall query execution time.
An example of performing filtering and aggregation before connections.
from pyspark.sql import SparkSession
# Создание SparkSession
spark = SparkSession.builder.appName("OptimizationExample").getOrCreate()
# Пример данных
data1 = [("Alice", 34, "HR"), ("Bob", 45, "IT"), ("Charlie", 29, "HR"), ("David", 40, "Finance")]
data2 = [("HR", "Human Resources"), ("IT", "Information Technology"), ("Finance", "Financial Department")]
df1 = spark.createDataFrame(data1, ["name", "age", "dept"])
df2 = spark.createDataFrame(data2, ["dept", "department_name"])
# Фильтрация до соединения
filtered_df1 = df1.filter(df1['age'] > 30)
# Соединение
joined_df = filtered_df1.join(df2, filtered_df1.dept == df2.dept)
# Показ результатов
joined_df.show()
# Остановка SparkSession
spark.stop()
In this example, the df1 data is first filtered to contain only records with an age greater than 30. Only then is the join with df2 performed. This sequence reduces the amount of data involved in the join and improves performance.
Strategies for building queries from smaller tables to larger ones
Why is it important?
Efficient use of memoryStarting with smaller tables helps avoid memory bottlenecks because smaller amounts of data are easier to process and cache.
Reduce the amount of shuffle. Joining smaller tables into larger ones reduces the amount of shuffle data, which improves network performance and overall execution time.
Faster access to results. Processing smaller tables allows you to quickly obtain intermediate results that can be used for further operations.
An example of a query building strategy.
Let's look at an example where we need to perform multiple joins, starting with smaller tables.
# Создание SparkSession
spark = SparkSession.builder.appName("JoinOptimizationExample").getOrCreate()
# Пример данных
data_small = [("Alice", 34, "HR"), ("Bob", 45, "IT")]
data_medium = [("HR", "Human Resources"), ("IT", "Information Technology"), ("Finance", "Financial Department")]
data_large = [("HR", 1), ("IT", 2), ("Finance", 3), ("HR", 4), ("IT", 5), ("Finance", 6)]
df_small = spark.createDataFrame(data_small, ["name", "age", "dept"])
df_medium = spark.createDataFrame(data_medium, ["dept", "department_name"])
df_large = spark.createDataFrame(data_large, ["dept", "id"])
# Соединение маленькой таблицы с средней
joined_small_medium = df_small.join(df_medium, "dept")
# Фильтрация после первого соединения
filtered_join = joined_small_medium.filter(joined_small_medium['age'] > 30)
# Соединение с большой таблицей
final_join = filtered_join.join(df_large, "dept")
# Показ результатов
final_join.show()
# Остановка SparkSession
spark.stop()
In this example:
Connecting a small table to a medium one. First, we join df_small and df_medium. These are small tables, so the join operation is fast and requires fewer resources.
Filtering after first connection. After the first connection, we filter, leaving only records with an age greater than 30. This further reduces the data volume.
Connecting to a large table. Only after reducing the data volume do we connect the result with the large df_large table.
Different strategies for reducing data before joining
Using aggregationPerform aggregation before joining to reduce the data volume.
Using transitive propertiesFor example, if you have three tables and you need to join them, join the smaller tables first.
Caching. Caching intermediate results to avoid re-computation and reduce system load.
An example of using aggregation before joining.
# Создание SparkSession
spark = SparkSession.builder.appName("AggregationBeforeJoinExample").getOrCreate()
# Пример данных
data1 = [("Alice", 34, "HR"), ("Bob", 45, "IT"), ("Charlie", 29, "HR"), ("David", 40, "Finance"), ("Eve", 50, "IT")]
data2 = [("HR", "Human Resources"), ("IT", "Information Technology"), ("Finance", "Financial Department")]
df1 = spark.createDataFrame(data1, ["name", "age", "dept"])
df2 = spark.createDataFrame(data2, ["dept", "department_name"])
# Агрегация до соединения (средний возраст по департаментам)
agg_df1 = df1.groupBy("dept").avg("age").alias("avg_age")
# Соединение с таблицей департаментов
joined_df = agg_df1.join(df2, "dept")
# Показ результатов
joined_df.show()
# Остановка SparkSession
spark.stop()
In this example, we first perform an aggregation to calculate the average age by department, and then join the results to the departments table. This reduces the amount of data involved in the join and improves performance.
Conclusion
I hope this article helped you understand the basics of Apache Spark performance optimization and answered some of your questions. Happy data mining!