SPARK for “kids”
Examples of Python code for working with Apache Spark for the “little ones” (and a few “pictures”).
In the last article we looked at an example of creating Spark sessions, here we’ll talk about the capabilities and functions of Spark for data processing. And now I can send this article to all my newcomers.
This article provides an overview of the main features of Apache Spark and discusses how they can be used in real-world data processing tasks. Apache Spark is a powerful and flexible system for processing large volumes of data, offering a wide range of capabilities for analytics and machine learning. In our review, we'll focus on the key features of reading, processing, and storing data, showcasing code examples that will help newbies get up to speed and start using these features in their projects.
It is important to understand that while the information provided is extensive and useful, to fully understand all aspects of working with Spark, you must also read the official documentation. It contains a complete guide to all features and functionality, as well as detailed instructions for installation, configuration and optimization of the system. To further explore and learn more about Apache Spark, we recommend visiting official Apache Spark documentation page.
With basic instructions in hand and examples from our article, you will be well prepared to get started
The article was made to quickly find functions and capabilities.
No. 2. Processing: data analysis (filtering and sorting, aggregation).
No. 3.1. Additional options for saving settings
No. 1. Reading
Reading from a table
df = spark.read.table(“s_schema.new_table”)
This method is used to load data from a table into a Spark DataFrame. This is a convenient way to get started working with the data in your data warehouse. It's especially useful when you're working with well-structured data that has a predefined schema.
Reading from a directory
df = spark.read.parquet('/shema/dir/db/table_name/partition_1/')
This method allows you to load data directly from the file system. Parquet is an efficient and compact storage format that is suitable for working with large volumes of data, as it provides high data compression and optimized performance through columnar storage.
Reading via SQL query, we'll look at Hive
df = spark.sql(‘SELECT * FROM SHCEMA.NEW_TABLE’)
Using SQL queries in Spark allows you to take advantage of the full power of SQL to analyze the data residing in your Spark cluster. This is especially useful if you are already familiar with SQL. Spark SQL allows you to perform complex queries, joins, and aggregations, making your code more readable.
Reading CSV files
df = spark.read.format("csv").option("header", "true").load("path/to/file.csv")
This method is used to read CSV files. Option header
indicates whether the first row contains column headers. Method load
downloads a file to the specified path. This is a convenient way to work with tabular data in CSV format.
Reading JSON files
df = spark.read.json("path/to/file.json")
The read.json method allows you to load JSON files directly into a DataFrame. Spark automatically interprets the JSON structure and converts it into a table. This is especially useful when working with data that has a complex nested structure.
Reading data from JDBC sources
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:port/database") \
.option("dbtable", "schema.table") \
.option("user", "username") \
.option("password", "password") \
.load()
This method allows you to connect to databases via JDBC. You can specify the connection URL, table name, username and password. This is ideal for cases where data is already stored in a relational database and you want to analyze it using Spark.
Reading Avro files
df = spark.read.format("avro").load("path/to/file.avro")
Avro is a binary serialization format that provides compact, fast, and efficient data storage. Reading data in the Avro format allows for easy integration with systems that use this format to store data.
Reading text files
df = spark.read.text("path/to/textfile.txt")
If your data is in plain text format, this method will load the text file as a DataFrame with a single value column containing lines of text. This is useful for text mining or log file processing tasks.
No. 2. Processing: data analysis (filtering and sorting, aggregation)
Data filtering
filtered_df = df.filter(df["age"] > 30)
Method filter
allows you to select rows based on a given condition. This is an analogue of SQL operation WHERE
. The example above selects all users over 30 years old.
Grouping data
grouped_df = df.groupBy("city").count()
Method groupBy
used to group data into one or more columns and then perform aggregate functions (for example, count
, max
, min
, avg
). This example counts the number of records for each city.
Sorting data
sorted_df = df.orderBy("age")
Method orderBy
used to sort data by one or more columns. In the example, the data is sorted by age.
Data Aggregation
aggregated_df = df.agg({"salary": "avg"})
Method agg
used to perform one or more aggregate operations on the entire DataFrame. The example above calculates the average salary.
Adding a new column
df_with_new_column = df.withColumn("is_adult", df["age"] >= 18)
withColumn
adds a new column or overwrites an existing one using an expression or value. In this case, a column is added indicating whether the person is an adult.
Delete a column
df_without_column = df.drop("is_adult")
Method drop
removes a column from the DataFrame. The example removes a column is_adult
.
Changing a Column's Data Type
df_with_casted_column = df.withColumn("age", df["age"].cast("integer"))
Changing the data type of a column can be done using the method cast
which is used in withColumn
.
Column Splitting
split_col = pyspark.sql.functions.split(df["name"], " ")
df_with_split_columns = df.withColumn("first_name", split_col.getItem(0))
df_with_split_columns = df_with_split_columns.withColumn("last_name", split_col.getItem(1))
Method split
from module pyspark.sql.functions
splits a column string into an array of substrings. getItem
used to retrieve elements from an array.
Joining tables (JOIN)
joined_df = df1.join(df2, df1["id"] == df2["user_id"], how='inner')
JOIN
used to combine two DataFrames based on one or more keys. In the how parameter you can specify the type JOIN
(For example, inner
, outer
, left_outer
, right_outer
).
Applying User Defined Functions (UDF)
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
def square(x):
return x * x
square_udf = udf(square, IntegerType())
df_with_square = df.withColumn("squared_value", square_udf(df["value"]))
User Defined Functions (UDF) allow you to apply custom functions to the data in a DataFrame. This example creates a UDF to square a number.
These techniques provide the basis for data manipulation and analysis in Spark and can be combined to solve complex large data processing problems.
No. 3. Writing Data to Spark
Writing data to CSV files
df.write.format("csv").option("header", "true").save("path/to/save/file.csv")
This method allows you to save the DataFrame in CSV format. Option header
indicates that a line with column headers should be added to the file.
Recording data in Parquet format
df.write.parquet("path/to/save/file.parquet")
Parquet is a columnar data storage format. Writing data to Parquet is efficient in speed and allows you to significantly save disk space.
Writing data in JSON format
df.write.json("path/to/save/file.json")
JSON is widely used to store structured data. Spark allows you to save data in JSON format, which is convenient for later use in web applications and systems that support JSON.
Writing to databases via JDBC
df.write \
.format("jdbc") \
.option("url", "jdbc:mysql://host:port/database") \
.option("dbtable", "table_name") \
.option("user", "username") \
.option("password", "password") \
.save()
This method allows you to save a DataFrame directly to a relational database via JDBC. This can be useful for integration with legacy systems and direct access to data via SQL.
Record in ORC format
df.write.orc("path/to/save/file.orc")
ORC (Optimized Row Columnar) is a data storage format optimized for large volumes of data. It provides efficient compression and performance, making it suitable for use in large data lakes.
Recording with partitioning
df.write.partitionBy("year", "month").parquet("path/to/save")
This method allows you to separate data into specified columns (in this case, year and month), which simplifies data management and optimizes the reading process when you need to access only certain segments of the data.
Recording using Dubbing Mode
df.write.mode("overwrite").parquet("path/to/save/file.parquet")
Mode overwrite
tells Spark to delete existing data before writing new data. This is useful when updating data in the warehouse.
Recording using Add mode
df.write.mode("append").parquet("path/to/save/file.parquet")
In mode append
new data is added to existing data without deleting previous data. This is suitable for situations where data must accumulate over time.
Writing to a Hive Table
df.write.saveAsTable("database.table_name")
Method saveAsTable
allows you to save a DataFrame directly into a Hive table, which integrates the data with the Hadoop ecosystem and allows it to be used in various Big Data applications.
Recording with InsertInto
Method insertInto
Inserts data into the specified table. It requires that the structure of the DataFrame match the structure of the target table, including column names and types. If the structure of the DataFrame and the table do not match, the operation will fail.
#Method parameters:
tableName
: The name of the table into which the data will be inserted. This name may include the name of the database (for example, database.table
).
mode
: data recording mode, can take values:
append
— data is added to existing ones in the table;overwrite
– existing data in the table is deleted before new data is inserted (this behavior may depend on Spark and Hive configuration settings).
Adding data to a table
df.write.mode("append").insertInto("my_database.my_table")
In this example, data from df
are added to data that already exists in the table my_table
Database my_database
.
Overwriting data in a table
df.write.saveAsTable("database.table_name")
Here's the data in my_table
will be completely replaced by data from df
. It is important to note that depending on Hive settings, the operation may not only replace the data in the specified table, but also delete all previous partitions, which may affect the performance of data queries.
WARNING DO NOT REPEAT – DANGEROUS TO LIFE!!
insertInto and deleting partitions
Scenario for using insertInto with deleting partitions.
Let's say we have a partitioned table sales in Hive, which is partitioned by year and month (year, month). If we use the method insertInto
in mode overwrite
to insert data into this table, this may result in deleting all existing partitions if you do not specify specific partitions to write to.
df.write.mode("overwrite").insertInto("database.sales")
#Why is this happening?
When Spark performs an operation overwrite
without explicitly specifying the partitions, he may interpret this as requiring replacement of all data in the table, including all partitions. As a result, Spark will delete all existing partitions in the table sales
and replace them with data from df
. This behavior can be extremely undesirable, especially if the table stores a lot of data, divided into partitions to optimize queries.
#How to prevent unwanted deletion of partitions?
1) Explicit indication of partitions when recording.
By specifying partitions when recording, you can control which partitions should be overwritten. For example, if we want to overwrite data only for a specific month and year:
df.write.mode("overwrite").partitionBy("year", "month").insertInto("database.sales")
In this case, Spark will only rewrite those partitions that match the year and month in the data df
and the rest will remain untouched.
2) Using configuration settings.
In some cases, you can customize the behavior of Spark and Hive to work more accurately with partitions. For example, setting spark.sql.sources.partitionOverwriteMode
in Spark can be installed in dynamic
which allows Spark to dynamically determine which partitions should be overwritten based on the contents of the DataFrame.
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
df.write.mode("overwrite").insertInto("database.sales")
Recommendations.
Always check your Spark and Hive settings carefully when working with
insertInto
especially when usingoverwrite
.When working with critical data, it is recommended that you first test write operations on small amounts of data or in a test environment to avoid data loss.
Use if necessary
overwrite
try to explicitly specify the partitions that need to be overwritten to minimize the risk of unwanted data deletion.
This will avoid data loss and provide a more controlled and secure use of Spark for processing and storing large amounts of data.
No. 3.1. Additional options for saving settings
partitionBy
Allows you to specify columns for partitioning data when saving.
Partitioning data consists of physically dividing it into certain keys (columns), which can significantly speed up read and write operations, as well as queries that filter data by these keys. Partitioning is especially useful in large distributed systems where managing access to data and processing it can be costly in terms of time and resources.
#Example of using partitionBy
df.write.partitionBy("country", "state").format("parquet").save("/path/to/data")
#Parameters of the partitionBy method
columnNames
: Columns by which the data will be divided. The number and choice of columns for partitioning depend on the characteristics of the data and query requirements.
#Benefits of using partitionBy
Improved reading performance. Partitioning allows you to “pre-filter” data at the file system level, which reduces the amount of data loaded when executing queries.
Resource optimization. Partitioning helps manage resources by distributing data across files and directories, which optimizes storage and access to data.
Scalability. Partitioning improves application scalability because data processing and storage become more efficient.
#Recommendations for using partitionBy
Selecting partitioning keys. It is important to choose partitioning keys that are often used in filtering queries. This allows you to maximize the benefits of partitioning.
Partition size accounting. Partitions that are too small or too large can reduce performance. It is necessary to strive for a uniform distribution of partition sizes.
Limited number of partitions. Too many of them can lead to an increase in the number of small files, which makes the file system difficult to manage and can reduce performance.
bucketBy
Organizes data into batches based on specified columns, which can improve query performance.
Bucketing or bucket partitioning is a technique that divides data into manageable and evenly distributed parts. Data is distributed across buckets based on a hash function of one or more columns. All data with the same hash value goes into one bucket, which significantly speeds up table join operations because Spark can perform joins locally within a single bucket, minimizing data movement between cluster nodes.
#Example of using bucketBy
df.write.bucketBy(42, "column1")
.sortBy("column2")
.saveAsTable("bucketed_table")
#Parameters of the bucketBy method
numBuckets
: number of buckets into which the data will be split. Choosing the right number of buckets depends on the data size and cluster structure. Too few buckets may not result in significant optimization, while too many may degrade performance due to the overhead of managing a large number of small files.columnNames
: Columns by which buckets will be generated. These columns should be the same ones that are frequently used for join or aggregation operations to maximize efficiency.
#Benefits of using bucketBy
Improved connection performance. By joining two tables that are partitioned on the same columns, Spark can perform the bucket join independently of each other, which reduces query execution time.
Aggregation optimization. Aggregations by bucketed keys can be optimized because the data is already grouped by the desired keys.
Reduce data movement. Since the data is localized in buckets, data movement across the network (shuffle) during connections and aggregations is minimized.
#Recommendations for using bucketBy
Testing. Start with fewer buckets and increase the number as you see how performance changes.
Consistency. Use the same fields for bucketing in tables that are often connected to each other.
Avoid small buckets. Too many small buckets can result in increased processing overhead for many small files.
sortBy
Defines the columns that will be sorted on in each batch when using bucketBy.
sortBy
allows you to specify one or more columns by which the data will be sorted within each bucket. Sorting data within buckets can greatly improve the efficiency of downstream operations such as joins and windowing functions, since the data that needs to be processed together is physically closer together.
#Example of using sortBy
df.write.bucketBy(10, "department_id").sortBy("employee_salary").saveAsTable("employees_bucketed_sorted")
#Parameters of the sortBy method
columnNames
: A list of columns that will be sorted by within each bucket. The order of the columns in the list determines the sorting priority
#Benefits of using sortBy
Improved query performance. Sorting data within buckets allows Spark to perform joins and aggregations more efficiently because the data needed for these operations is physically closer together.
Optimizing data scanning. When queries require access to sorted data (for example, when using window functions), presorted buckets can significantly speed up processing.
#Recommendations for using sortBy
Limited use. Use
sortBy
only when there is a clear understanding of how queries will use the data. Unnecessary use of sorting can lead to additional overhead when writing data.Combination with
bucketBy
. More oftensortBy
used in combination withbucketBy
to optimize operations that can benefit from pre-sorted data.Performance testing. It's always a good idea to perform performance testing to ensure that the application
sortBy
really improves query response time, especially in large and complex data lakes.
At the end
In this article, we examined in detail the main functions of Apache Spark, which allow you to effectively work with large amounts of data. From reading different data formats to complex processing operations and finally various ways to store the processed data, all these features make Spark a powerful tool for data analysis.
Using data reading capabilities facilitates integration with various data sources and allows you to easily load data into your work environment.
Processing functions such as filtering, grouping, sorting, and aggregation provide flexible tools for manipulating data and extracting useful information.
Various data storage options help optimize both final data storage and subsequent access to it.
I hope that the examples presented in this article will help beginners quickly get used to Apache Spark and begin their journey in the field of big data processing. In addition to the functionality of Spark, I also touched on best practices and recommendations for the optimal use of this tool, which will undoubtedly come in handy in real projects.
Remember that regularly updating your knowledge and being able to adapt to new technologies are key skills for a data scientist. So keep learning and experimenting with Apache Spark to find new and powerful ways to work with data. Good luck in your endeavors and see you again in the world of big data!
Follow new articles, and do not forget to share your successes and discoveries with colleagues and the community. Spark is not only a technology, but also an actively developing community of specialists that is always open to sharing knowledge and experience.