Egor Mateshuk (CDO AdTech-company Quantum and teacher at OTUS) invites Data Engineers to take part in a free Demo-lesson “Spark 3.0: What’s New?”… Find out how Spark 3.0 achieves high performance, as well as look at other innovations.
We also invite you to watch the broadcast recording of the Demo-lesson “Writing Effective Custom Functions in Spark” and pass the introductory test for the course “Ecosystem Hadoop, Spark, Hive”!
You already have everything you need to debug requests.
Spark is the most widely used big data computing framework, capable of performing tasks on petabytes of data. Spark provides a set of web UIs that you can use to track resource consumption and the health of your Spark cluster. Most of the problems we encounter while completing a job can be debugged by going to UI Spark.
spark2-shell --queue=P0 --num-executors 20 Spark context Web UI available at http://<hostname>:<port> Spark context available as 'sc' Spark session available as 'spark'
In this article, I will try to demonstrate how to debug a Spark task using only the Spark UI. I’ll run some Spark tasks and show you how the Spark UI reflects task completion. I will also share with you some tips and tricks.
This is what Spark UI looks like.
We’ll start with the SQL tab, which includes enough information for an initial overview. When using RDD, in some cases the SQL tab may not be present.
And here is the query that I run as an example
spark.sql("select id, count(1) from table1 group by id”).show(10, false)
Translation of clarifications on the right:
<- As part of the request, 3 tasks were launched, and the request itself was completed in 21 seconds.
<- Parquet files are scanned, they contain a total of 23.7M lines
<- This is the work done by each partition
1.generates hash id, count
2.groups id and sum count. This is how it looks
id = hash(125), count=1000
id = hash(124), count=900 …
<- The above data is exchanged based on the column id hash, so that as a result, each partition has one hash
<- The data of each partition is summed up and count is returned
Now let’s map this to a physical query plan. The physical plan can be found under the SQL DAG when you expand the tab details… We must read the plan from the bottom up
== Physical Plan == CollectLimit 11 +- *(2) HashAggregate(keys=[id#1], functions=[count(1)], output=[id#1, count(1)#79]) +- Exchange hashpartitioning(id#1, 200) +- *(1) HashAggregate(keys=[id#1], functions=[partial_count(1)], output=[id#1, count#83L]) +- *(1) FileScan parquet [id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://<node>:<port><location>, PartitionFilters: , PushedFilters: , ReadSchema: struct<id:string>
Here’s how to read the plan:
Scanning parquet. Pay attention to PushedFilters. I will demonstrate what this means later.
Creating a HashAggregate with keys. Pay attention to partial_count. This means that the aggregated count is partial, since the aggregation was done on every single task and was not mixed to produce the full set of values.
Now the generated data is aggregated based on the key, in this case id.
Now the whole count is calculated.
The resulting result
Now that we’ve figured it out, let’s take a look at the PuedFilters data. Spark is optimized for predicates, and any filters applied are pushed to the source. To demonstrate this, let’s look at another version of this query.
spark.sql("select id, count(1) from table1 where status="false" group by id”).show(10, false)
And this is his plan
+- *(2) HashAggregate(keys=[id#1], functions=[count(1)], output=[id#1, count(1)#224]) +- Exchange hashpartitioning(id#1, 200) +- *(1) HashAggregate(keys=[id#1], functions=[partial_count(1)], output=[id#1, count#228L]) +- *(1) Project [id#1] +- *(1) Filter (isnotnull(status#3) && (status#3 = false)) +- *(1) FileScan parquet [id#1,status#3] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://mr25p01if-ingx03010101.mr.if.apple.com:50001/home/hadoop/work/news/20200..., PartitionFilters: , PushedFilters: [IsNotNull(status), EqualTo(status,false)], ReadSchema: struct<id:string,status:string>
Notice the changes from the previous plan.
We see something else in PushedFilters – checking for null and checking for equality. The column to which we apply the filter is pushed to the source, i.e. these lines are ignored when reading data. The result is carried over to the next steps.
Can we use filters to reduce the total amount of data (or files) read?
Yes we can. In both examples above, the total amount of data read is ~ 23.8M. To reduce it, we can use the magic of the parquet files. Parquet has group of lineswhich has statistics that can be used to ignore multiple group / file lines. This leads to the fact that these files are not read at all. You can read about how to do this in my other article on medium Insights Into Parquet Storage…
This tab gives us an idea of the number of currently active artists in your spark session.
spark2-shell — queue=P0 — driver-memory 20g — executor-memory 20g — num-executors 40
I requested 40 performers for the session, however on launch you can see that it only gave me 10 active performers. This could be because the hosts are down or Spark doesn’t need so many executors. It can also cause a delay in scheduling tasks, since you only have 10 executors and you need 40, which will affect concurrency.
The Environment tab contains detailed information about all the configuration parameters that the spark session is currently using.
See how the parameters I reflected earlier are reflected here. This is useful if only just to make sure that the configuration you provided is accepted.
It displays information about one of the most talked about Spark features, caching. There are many articles available on the Internet with differing opinions on whether to cache or not. Luckily, this article isn’t about when to cache, etc. It’s more about what happens when we cache.
But before that, let’s go back a bit and spend a few minutes on some of the basics of caching.
There are two ways to cache a Dataframe:
Several properties are required to cache a dataset.
Under the hood, this calls the “persist” method. Referring to the source code
def cache(): this.type = persist() /** * Persist this Dataset with the given storage level. * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,`MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,`MEMORY_AND_DISK_2`, etc. * @group basic * @since 1.6.0 */
DISK_ONLY: store (persist) data on disk in serialized format only.
MEMORY_AND_DISK: keep data in memory, and if there is not enough memory, the preempted blocks will be saved to disk.
MEMORY_ONLY_SER: This Spark tier stores the RDD as a serialized Java object (one byte array per partition). It is more compact compared to deserialized objects. But this increases the CPU overhead.
MEMORY_AND_DISK_SER: Similar to MEMORY_ONLY_SER, but written to disk when the data does not fit in memory.
Let’s use df.cache in our example and see what happens a.cache () -> Nothing is visible on the Storage tab. As you can guess, this is due to lazy evaluation
Let’s use df.cache in our example and see what happens
-> Nothing is visible on the Storage tab. As you can guess, this is due to lazy evaluation
We see some kind of data cache. The size in memory is 5.2 GB and my file size is 2 GB … hmmm … what happened here
hadoop dfs -dus <dirName> 2,134,751,429 6,404,254,287 <dirName>
This is because the data in memory is deserialized and uncompressed. This will result in more storage space than disk.
So when you want to decide whether to cache or not, keep that in mind.
I’ve seen some smart articles about whether to cache or not. It’s a good idea to check them out.
Next we will look at the Jobs and Stages tabs, the causes of many problems can be debugged using these tabs.
spark.sql("select is_new_user,count(1) from table1 group by is_new_user").show(10,false)
I can see that 3 tasks are running for the above request. But 2 of them are missing. This usually means that the data was retrieved from the cache and there was no need to repeat this step. In addition, Spark performs many bogus tasks to evaluate data. Skipping tasks could be related to this.
Let’s dive deep into the task that was not overlooked. This is a DAG visualization for the task
We can clearly see that this task consists of two stages, separated by a shuffle / swap operation. Stages means that data has been written to disk for use in the next process.
Let’s dive into the stages tab.
The first thing to always check is the summary metrics for the tasks. You can click show additional metrics for more facts. This will show many of the required parameters for minimum, median and maximum. In an ideal world, the minimum value should be close to the maximum.
Here are some points to note:
→ Duration: In our example, the minimum and maximum durations are 0.4 and 4 seconds respectively. This may be due to several reasons, and we will try to debug them in the points below.
→ Task deserialization time:
In our example, as part of deserializing a task, some time is spent on other tasks. One of the main reasons was the execution of garbage collection processes in executors. I had other processes running in which some data was cached, leading to garbage collection. Garbage collection processes are given the highest priority, and they stop all running processes in order to service the garbage collection process. If you see that your process is not consuming a lot of memory, the first step to fix such a problem might be to talk to your administrator / OPS.
→ Scheduler delay: The maximum scheduler delay is 0.4 seconds. This means that one of the tasks had to wait another 0.4 seconds for submission. Whether this value is large or small depends on your specific use case.
→ The input size is very highly distributed. This is very good as all tasks read the same amount of data. This is one of the most important things when looking for a bad / malformed query. This can be seen in the “shuffle read” column in the “Summary metrics for tasks” section. The simplest logic to solve such problems is to add salt to a group that can parallelize the data, and then finally aggregate the data without salt. This principle can be applied in many forms to solve the problem of data asymmetry.
Another thing to look out for is locality level…
* PROCESSLOCAL -> This task will run in the same process as the original data
* NODELOCAL -> This task will run on the same computer as the original data
* RACKLOCAL -> This task will run in the same block as the original data
* NOPREF (Displayed as ANY) → This task cannot be started in the same process as the original data, or it doesn’t matter.
Let’s say we are consuming data from a Cassandra node in a Spark cluster of three nodes. Cassandra runs on machine X of Spark nodes X, Y and Z. For node X, all data will be marked NODELOCAL… This means that after each core on X is busy, we are left with tasks whose preferred location is X, but we only have space to execute on Y and Z. Spark has only two options: wait, until kernels become available on X, or lower the locality level of the task and try to find room for them and accept any penalties for non-local execution.
Parameter spark.locality.wait describes how long to wait before downgrading tasks that could potentially run from a higher locality level to a lower level. This parameter is essentially our estimate of how much it costs to wait for a local seat. The default is 3 seconds, which means that in our Cassandra example, once our co-located node X is full of tasks, our other machines Y and Z will be idle for 3 seconds before tasks that could be NODELOCAL. will be downgraded to ANY * and launched.
Here sample code for this.
I hope this article serves you as a guide to debugging Spark UI to troubleshoot Spark performance issues. There are many additional features in Spark 3 that are worth looking at too.
It’s also a good idea to read Spark UI documentation…
You can also contact me at Linkedin.
Curious about how Apache Druid indexes data for ultra-fast queries? Find out about it here:
Is it interesting to develop in this direction? Participate in the broadcast of the master class “Spark 3.0: What’s New?” and check out the course program “Ecosystem Hadoop, Spark, Hive”!