SPARK for “kids”

Examples of Python code for working with Apache Spark for the “little ones” (and a few “pictures”).

In the last article we looked at an example of creating Spark sessions, here we’ll talk about the capabilities and functions of Spark for data processing. And now I can send this article to all my newcomers.

This article provides an overview of the main features of Apache Spark and discusses how they can be used in real-world data processing tasks. Apache Spark is a powerful and flexible system for processing large volumes of data, offering a wide range of capabilities for analytics and machine learning. In our review, we'll focus on the key features of reading, processing, and storing data, showcasing code examples that will help newbies get up to speed and start using these features in their projects.

It is important to understand that while the information provided is extensive and useful, to fully understand all aspects of working with Spark, you must also read the official documentation. It contains a complete guide to all features and functionality, as well as detailed instructions for installation, configuration and optimization of the system. To further explore and learn more about Apache Spark, we recommend visiting official Apache Spark documentation page.

With basic instructions in hand and examples from our article, you will be well prepared to get started

The article was made to quickly find functions and capabilities.

No. 1. Reading.

No. 2. Processing: data analysis (filtering and sorting, aggregation).

No. 3. Writing Data to Spark.

No. 3.1. Additional options for saving settings

Epilogue

No. 1. Reading

Reading from a table

df = spark.read.table(“s_schema.new_table”)

This method is used to load data from a table into a Spark DataFrame. This is a convenient way to get started working with the data in your data warehouse. It's especially useful when you're working with well-structured data that has a predefined schema.

Reading from a directory

df = spark.read.parquet('/shema/dir/db/table_name/partition_1/')

This method allows you to load data directly from the file system. Parquet is an efficient and compact storage format that is suitable for working with large volumes of data, as it provides high data compression and optimized performance through columnar storage.

Reading via SQL query, we'll look at Hive

df = spark.sql(‘SELECT * FROM SHCEMA.NEW_TABLE’)

Using SQL queries in Spark allows you to take advantage of the full power of SQL to analyze the data residing in your Spark cluster. This is especially useful if you are already familiar with SQL. Spark SQL allows you to perform complex queries, joins, and aggregations, making your code more readable.

Reading CSV files

df = spark.read.format("csv").option("header", "true").load("path/to/file.csv")

This method is used to read CSV files. Option header indicates whether the first row contains column headers. Method load downloads a file to the specified path. This is a convenient way to work with tabular data in CSV format.

Reading JSON files

df = spark.read.json("path/to/file.json")

The read.json method allows you to load JSON files directly into a DataFrame. Spark automatically interprets the JSON structure and converts it into a table. This is especially useful when working with data that has a complex nested structure.

Reading data from JDBC sources

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:port/database") \
    .option("dbtable", "schema.table") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

This method allows you to connect to databases via JDBC. You can specify the connection URL, table name, username and password. This is ideal for cases where data is already stored in a relational database and you want to analyze it using Spark.

Reading Avro files

df = spark.read.format("avro").load("path/to/file.avro")

Avro is a binary serialization format that provides compact, fast, and efficient data storage. Reading data in the Avro format allows for easy integration with systems that use this format to store data.

Reading text files

df = spark.read.text("path/to/textfile.txt")

If your data is in plain text format, this method will load the text file as a DataFrame with a single value column containing lines of text. This is useful for text mining or log file processing tasks.

No. 2. Processing: data analysis (filtering and sorting, aggregation)

Data filtering

filtered_df = df.filter(df["age"] > 30)

Method filter allows you to select rows based on a given condition. This is an analogue of SQL operation WHERE. The example above selects all users over 30 years old.

Grouping data

grouped_df = df.groupBy("city").count()

Method groupBy used to group data into one or more columns and then perform aggregate functions (for example, count, max, min, avg). This example counts the number of records for each city.

Sorting data

sorted_df = df.orderBy("age")

Method orderBy used to sort data by one or more columns. In the example, the data is sorted by age.

Data Aggregation

aggregated_df = df.agg({"salary": "avg"})

Method agg used to perform one or more aggregate operations on the entire DataFrame. The example above calculates the average salary.

Adding a new column

df_with_new_column = df.withColumn("is_adult", df["age"] >= 18)

withColumn adds a new column or overwrites an existing one using an expression or value. In this case, a column is added indicating whether the person is an adult.

Delete a column

df_without_column = df.drop("is_adult")

Method drop removes a column from the DataFrame. The example removes a column is_adult.

Changing a Column's Data Type

df_with_casted_column = df.withColumn("age", df["age"].cast("integer"))

Changing the data type of a column can be done using the method castwhich is used in withColumn.

Column Splitting

split_col = pyspark.sql.functions.split(df["name"], " ")
df_with_split_columns = df.withColumn("first_name", split_col.getItem(0))
df_with_split_columns = df_with_split_columns.withColumn("last_name", split_col.getItem(1))

Method split from module pyspark.sql.functions splits a column string into an array of substrings. getItem used to retrieve elements from an array.

Joining tables (JOIN)

joined_df = df1.join(df2, df1["id"] == df2["user_id"], how='inner')

JOIN used to combine two DataFrames based on one or more keys. In the how parameter you can specify the type JOIN (For example, inner, outer, left_outer, right_outer).

Applying User Defined Functions (UDF)

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def square(x):
    return x * x

square_udf = udf(square, IntegerType())
df_with_square = df.withColumn("squared_value", square_udf(df["value"]))

User Defined Functions (UDF) allow you to apply custom functions to the data in a DataFrame. This example creates a UDF to square a number.

These techniques provide the basis for data manipulation and analysis in Spark and can be combined to solve complex large data processing problems.

No. 3. Writing Data to Spark

Writing data to CSV files

df.write.format("csv").option("header", "true").save("path/to/save/file.csv")

This method allows you to save the DataFrame in CSV format. Option header indicates that a line with column headers should be added to the file.

Recording data in Parquet format

df.write.parquet("path/to/save/file.parquet")

Parquet is a columnar data storage format. Writing data to Parquet is efficient in speed and allows you to significantly save disk space.

Writing data in JSON format

df.write.json("path/to/save/file.json")

JSON is widely used to store structured data. Spark allows you to save data in JSON format, which is convenient for later use in web applications and systems that support JSON.

Writing to databases via JDBC

df.write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://host:port/database") \
    .option("dbtable", "table_name") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

This method allows you to save a DataFrame directly to a relational database via JDBC. This can be useful for integration with legacy systems and direct access to data via SQL.

Record in ORC format

df.write.orc("path/to/save/file.orc")

ORC (Optimized Row Columnar) is a data storage format optimized for large volumes of data. It provides efficient compression and performance, making it suitable for use in large data lakes.

Recording with partitioning

df.write.partitionBy("year", "month").parquet("path/to/save")

This method allows you to separate data into specified columns (in this case, year and month), which simplifies data management and optimizes the reading process when you need to access only certain segments of the data.

Recording using Dubbing Mode

df.write.mode("overwrite").parquet("path/to/save/file.parquet")

Mode overwrite tells Spark to delete existing data before writing new data. This is useful when updating data in the warehouse.

Recording using Add mode

df.write.mode("append").parquet("path/to/save/file.parquet")

In mode append new data is added to existing data without deleting previous data. This is suitable for situations where data must accumulate over time.

Writing to a Hive Table

df.write.saveAsTable("database.table_name")

Method saveAsTable allows you to save a DataFrame directly into a Hive table, which integrates the data with the Hadoop ecosystem and allows it to be used in various Big Data applications.

Recording with InsertInto

Method insertInto Inserts data into the specified table. It requires that the structure of the DataFrame match the structure of the target table, including column names and types. If the structure of the DataFrame and the table do not match, the operation will fail.

#Method parameters:

tableName: The name of the table into which the data will be inserted. This name may include the name of the database (for example, database.table).

mode: data recording mode, can take values:

Adding data to a table

df.write.mode("append").insertInto("my_database.my_table")

In this example, data from df are added to data that already exists in the table my_table Database my_database.

Overwriting data in a table

df.write.saveAsTable("database.table_name")

Here's the data in my_table will be completely replaced by data from df. It is important to note that depending on Hive settings, the operation may not only replace the data in the specified table, but also delete all previous partitions, which may affect the performance of data queries.

WARNING DO NOT REPEAT – DANGEROUS TO LIFE!!

insertInto and deleting partitions

Scenario for using insertInto with deleting partitions.

Let's say we have a partitioned table sales in Hive, which is partitioned by year and month (year, month). If we use the method insertInto in mode overwrite to insert data into this table, this may result in deleting all existing partitions if you do not specify specific partitions to write to.

df.write.mode("overwrite").insertInto("database.sales")

#Why is this happening?

When Spark performs an operation overwrite without explicitly specifying the partitions, he may interpret this as requiring replacement of all data in the table, including all partitions. As a result, Spark will delete all existing partitions in the table sales and replace them with data from df. This behavior can be extremely undesirable, especially if the table stores a lot of data, divided into partitions to optimize queries.

#How to prevent unwanted deletion of partitions?

1) Explicit indication of partitions when recording.

By specifying partitions when recording, you can control which partitions should be overwritten. For example, if we want to overwrite data only for a specific month and year:

df.write.mode("overwrite").partitionBy("year", "month").insertInto("database.sales")

In this case, Spark will only rewrite those partitions that match the year and month in the data dfand the rest will remain untouched.

2) Using configuration settings.

In some cases, you can customize the behavior of Spark and Hive to work more accurately with partitions. For example, setting spark.sql.sources.partitionOverwriteMode in Spark can be installed in dynamicwhich allows Spark to dynamically determine which partitions should be overwritten based on the contents of the DataFrame.

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
df.write.mode("overwrite").insertInto("database.sales")

Recommendations.

This will avoid data loss and provide a more controlled and secure use of Spark for processing and storing large amounts of data.

No. 3.1. Additional options for saving settings

partitionBy

Allows you to specify columns for partitioning data when saving.

Partitioning data consists of physically dividing it into certain keys (columns), which can significantly speed up read and write operations, as well as queries that filter data by these keys. Partitioning is especially useful in large distributed systems where managing access to data and processing it can be costly in terms of time and resources.

#Example of using partitionBy

df.write.partitionBy("country", "state").format("parquet").save("/path/to/data")

#Parameters of the partitionBy method

columnNames: Columns by which the data will be divided. The number and choice of columns for partitioning depend on the characteristics of the data and query requirements.

#Benefits of using partitionBy

#Recommendations for using partitionBy

bucketBy

Organizes data into batches based on specified columns, which can improve query performance.

Bucketing or bucket partitioning is a technique that divides data into manageable and evenly distributed parts. Data is distributed across buckets based on a hash function of one or more columns. All data with the same hash value goes into one bucket, which significantly speeds up table join operations because Spark can perform joins locally within a single bucket, minimizing data movement between cluster nodes.

#Example of using bucketBy

df.write.bucketBy(42, "column1")
    .sortBy("column2")
    .saveAsTable("bucketed_table")

#Parameters of the bucketBy method

#Benefits of using bucketBy

#Recommendations for using bucketBy

sortBy

Defines the columns that will be sorted on in each batch when using bucketBy.

sortBy allows you to specify one or more columns by which the data will be sorted within each bucket. Sorting data within buckets can greatly improve the efficiency of downstream operations such as joins and windowing functions, since the data that needs to be processed together is physically closer together.

#Example of using sortBy

df.write.bucketBy(10, "department_id").sortBy("employee_salary").saveAsTable("employees_bucketed_sorted")

#Parameters of the sortBy method

columnNames: A list of columns that will be sorted by within each bucket. The order of the columns in the list determines the sorting priority

#Benefits of using sortBy

#Recommendations for using sortBy

At the end

In this article, we examined in detail the main functions of Apache Spark, which allow you to effectively work with large amounts of data. From reading different data formats to complex processing operations and finally various ways to store the processed data, all these features make Spark a powerful tool for data analysis.

I hope that the examples presented in this article will help beginners quickly get used to Apache Spark and begin their journey in the field of big data processing. In addition to the functionality of Spark, I also touched on best practices and recommendations for the optimal use of this tool, which will undoubtedly come in handy in real projects.

Remember that regularly updating your knowledge and being able to adapt to new technologies are key skills for a data scientist. So keep learning and experimenting with Apache Spark to find new and powerful ways to work with data. Good luck in your endeavors and see you again in the world of big data!

Follow new articles, and do not forget to share your successes and discoveries with colleagues and the community. Spark is not only a technology, but also an actively developing community of specialists that is always open to sharing knowledge and experience.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *