I'm running into an issue when trying to convert a DF to an RDD. One stage in this process uses a total of 200 tasks, while most parts before this use a lot more, and I'm having trouble understanding why it uses this number and whether I need to find a way to increase this to improve performance.
The program uses Spark version 2.1.0 and is run on a Yarn cluster where I'm using 250 executors.
These are the lines where the DF is converted to RDD:
val predictionRdd = selectedPredictions
.withColumn("probabilityOldVector", convertToOldVectorUdf($"probability"))
.select("mid", "probabilityOldVector")
.rdd
This results in the previously mentioned 200 tasks as seen in the active stage in the following screenshot.
It's basically just stuck here for I don't know how long and the other two finished stages used significantly more tasks.
One thing I tried was performing a repartition right before converting it to RDD:
val predictionRdd = selectedPredictions
.withColumn("probabilityOldVector", convertToOldVectorUdf($"probability"))
.select("mid", "probabilityOldVector")
.repartition(2000)
.rdd
val avgPredictions = predictionRdd
.map(row => (row.getAs[String]("mid"), row.getAs[OldVector]("probabilityOldVector")))
.aggregateByKey(new MultivariateOnlineSummarizer)(
(agg, v) => agg.add(v),
(agg1, agg2) => agg1.merge(agg2)
)
.map(p => (p._1, p._2.mean))
The assumption was that this would, ideally, result in 2000 tasks being performed. However this had a (for me) unexpected result. This image (same as before) shows the entire job that this part belongs to. The interesting point is that it's still showing 200 tasks, and the 2000 partitions from the repartition right before converting it to RDD are visible in the number of tasks used for the pending map stage.
It feels to me that to increase the speed of this part that I would need to increase the number of tasks being performed, allowing it to run more in parallel with less memory being used for each task.
So my questions are basically:
- Am I understanding the situation at least somewhat correctly, or does the problem lie somewhere completely else,
- Would increasing the number of tasks being performed also improve the speed,
- How can the number of tasks (or partitions) be increased for this section, or in what other way can I increase the speed?
I'm still kind of new to Spark, I sort of know my way around it on a higher level, but the actual intricacies still elude me.
At the time of writing I noticed it finally shows some progress after about 1.3 hours, for something seemingly simple.
Below are small parts of the aggregated metrics by executors and by task:
Executor ID Address Task Time Total Tasks Failed Tasks Killed Tasks Succeeded Tasks Shuffle Read Size / Records Shuffle Write Size / Records Shuffle Spill (Memory) Shuffle Spill (Disk)
1 - 1.4 h 1 0 0 1 1810.3 MB / 8527038 2.1 GB / 2745175 5.9 GB 1456.3 MB
10 - 0 ms 0 0 0 0 1808.2 MB / 8515093 0.0 B / 1839668 5.9 GB 1456.7 MB
and
Index ID Attempt Status Locality Level Executor ID / Host Launch Time Duration Scheduler Delay Task Deserialization Time GC Time Result Serialization Time Getting Result Time Peak Execution Memory Shuffle Read Size / Records Write Time Shuffle Write Size / Records Shuffle Spill (Memory) Shuffle Spill (Disk) Errors
0 19454 0 RUNNING PROCESS_LOCAL 197 / worker176.hathi.surfsara.nl 2017/05/29 17:23:37 1.5 h 0 ms 0 ms 3.8 min 0 ms 0 ms 3.1 GB 1809.9 MB / 8525371 0.0 B / 1839667 5.9 GB 1456.0 MB
1 19455 0 SUCCESS PROCESS_LOCAL 85 / worker134.hathi.surfsara.nl 2017/05/29 17:23:37 1.5 h 42 ms 8 s 3.2 min 0 ms 0 ms 6.0 GB 1808.3 MB / 8519686 5 s 2.1 GB / 2742924 5.9 GB 1456.3 MB
Here are several other screenshots for context, added as links as to not to make this post too long:
Stage 45 and 46 run simultaneously right before 47. The main part of the source code can be viewed in this snippet on GitHub. It loads a previously trained CrossValidatorModel
consisting of a pipeline with five steps: CharNGram
, CountVectorizer
, IDF
, RandomForestClassifier
and IndexToString
. It predicts the probabilities of 200 classes for approximately 550 million text snippets of a max length of 550 characters. These predictions are then grouped together per input class and then averaged.
200
is the default value for the number of partitions in Spark SQL for shuffles -- see spark.sql.shuffle.partitions. Can you attach a screenshot to see the execution plan? I'm sure you'll haveExchange
somewhere that's a shuffle step. – Jacek LaskowskiExchange
parts. Unfortunately changing the number of partitions doesn't appear to have a direct effect on the speed in my case, so it looks like the issue lies somewhere else. – fdorssersas[T]
). They won't get many optimizations given issues.apache.org/jira/browse/SPARK-14083 – Jacek Laskowski.filter($"snippet".isNotNull)
withpersonAnnotationSnippetDs .na.drop
(on lines 131-133). Why do you.rdd
on line 147 and work withRow
at 152? Is this for MultivariateOnlineSummarizer? Do you need line 152 at all? Why do you.coalesce(128)
on line 185? – Jacek Laskowski