2
votes

I'm running into an issue when trying to convert a DF to an RDD. One stage in this process uses a total of 200 tasks, while most parts before this use a lot more, and I'm having trouble understanding why it uses this number and whether I need to find a way to increase this to improve performance.

The program uses Spark version 2.1.0 and is run on a Yarn cluster where I'm using 250 executors.

These are the lines where the DF is converted to RDD:

val predictionRdd = selectedPredictions
    .withColumn("probabilityOldVector", convertToOldVectorUdf($"probability"))
    .select("mid", "probabilityOldVector")
    .rdd

This results in the previously mentioned 200 tasks as seen in the active stage in the following screenshot.

enter image description here

It's basically just stuck here for I don't know how long and the other two finished stages used significantly more tasks.

One thing I tried was performing a repartition right before converting it to RDD:

val predictionRdd = selectedPredictions
    .withColumn("probabilityOldVector", convertToOldVectorUdf($"probability"))
    .select("mid", "probabilityOldVector")
    .repartition(2000)
    .rdd

val avgPredictions = predictionRdd
    .map(row => (row.getAs[String]("mid"), row.getAs[OldVector]("probabilityOldVector")))
    .aggregateByKey(new MultivariateOnlineSummarizer)(
        (agg, v) => agg.add(v),
        (agg1, agg2) => agg1.merge(agg2)
    )
    .map(p => (p._1, p._2.mean))

The assumption was that this would, ideally, result in 2000 tasks being performed. However this had a (for me) unexpected result. This image (same as before) shows the entire job that this part belongs to. The interesting point is that it's still showing 200 tasks, and the 2000 partitions from the repartition right before converting it to RDD are visible in the number of tasks used for the pending map stage.

It feels to me that to increase the speed of this part that I would need to increase the number of tasks being performed, allowing it to run more in parallel with less memory being used for each task.

So my questions are basically:

  1. Am I understanding the situation at least somewhat correctly, or does the problem lie somewhere completely else,
  2. Would increasing the number of tasks being performed also improve the speed,
  3. How can the number of tasks (or partitions) be increased for this section, or in what other way can I increase the speed?

I'm still kind of new to Spark, I sort of know my way around it on a higher level, but the actual intricacies still elude me.

At the time of writing I noticed it finally shows some progress after about 1.3 hours, for something seemingly simple.

Below are small parts of the aggregated metrics by executors and by task:

Executor ID  Address  Task Time  Total Tasks  Failed Tasks  Killed Tasks  Succeeded Tasks  Shuffle Read Size / Records  Shuffle Write Size / Records  Shuffle Spill (Memory)  Shuffle Spill (Disk)
1            -        1.4 h      1            0             0             1                1810.3 MB / 8527038          2.1 GB / 2745175              5.9 GB                  1456.3 MB 
10           -        0 ms       0            0             0             0                1808.2 MB / 8515093          0.0 B / 1839668               5.9 GB                  1456.7 MB 

and

Index  ID     Attempt  Status   Locality Level  Executor ID / Host                 Launch Time          Duration  Scheduler Delay  Task Deserialization Time  GC Time  Result Serialization Time  Getting Result Time  Peak Execution Memory  Shuffle Read Size / Records  Write Time  Shuffle Write Size / Records  Shuffle Spill (Memory)  Shuffle Spill (Disk)  Errors
0      19454  0        RUNNING  PROCESS_LOCAL   197 / worker176.hathi.surfsara.nl  2017/05/29 17:23:37  1.5 h     0 ms             0 ms                       3.8 min  0 ms                       0 ms                 3.1 GB                 1809.9 MB / 8525371                      0.0 B / 1839667               5.9 GB                  1456.0 MB       
1      19455  0        SUCCESS  PROCESS_LOCAL   85 / worker134.hathi.surfsara.nl   2017/05/29 17:23:37  1.5 h     42 ms            8 s                        3.2 min  0 ms                       0 ms                 6.0 GB                 1808.3 MB / 8519686          5 s         2.1 GB / 2742924              5.9 GB                  1456.3 MB       

DAG

Here are several other screenshots for context, added as links as to not to make this post too long:

Stage 45 and 46 run simultaneously right before 47. The main part of the source code can be viewed in this snippet on GitHub. It loads a previously trained CrossValidatorModel consisting of a pipeline with five steps: CharNGram, CountVectorizer, IDF, RandomForestClassifier and IndexToString. It predicts the probabilities of 200 classes for approximately 550 million text snippets of a max length of 550 characters. These predictions are then grouped together per input class and then averaged.

1
200 is the default value for the number of partitions in Spark SQL for shuffles -- see spark.sql.shuffle.partitions. Can you attach a screenshot to see the execution plan? I'm sure you'll have Exchange somewhere that's a shuffle step.Jacek Laskowski
I hadn't expected an option like that to be hardcoded in the config. Thanks for the heads up. Changing that does indeed change the number of tasks. I've also added an image of the DAG and it does indeed have multiple Exchange parts. Unfortunately changing the number of partitions doesn't appear to have a direct effect on the speed in my case, so it looks like the issue lies somewhere else.fdorssers
Can you show the Stage 46? I'm concerned with three exchanges in one stage. What are you shuffling so much?Jacek Laskowski
Can you remove using typed Dataset API (all operators after as[T]). They won't get many optimizations given issues.apache.org/jira/browse/SPARK-14083Jacek Laskowski
Replace .filter($"snippet".isNotNull) with personAnnotationSnippetDs .na.drop (on lines 131-133). Why do you .rdd on line 147 and work with Row at 152? Is this for MultivariateOnlineSummarizer? Do you need line 152 at all? Why do you .coalesce(128) on line 185?Jacek Laskowski

1 Answers

2
votes

You can set the number of tasks using:

      val spConfig = (new SparkConf).setMaster("local[*]").setAppName("MoviesRec")
  // Spark UI available at port 4040.. check here also
  spark = SparkSession.builder().appName("Movies").config(spConfig)
    .config("spark.ui.enabled", true)
      .config("spark.sql.shuffle.partitions", "100")

In logs you will get something like this:[Stage 9:======================================================> (98 + 2) / 100]
200 is the default for dataframes. You can also check it on localhost:4040 on SparkUI, check number of partitions, and size of each partitions, and total RAM you have..
On my desktop if I reduce default number of partitions from 200 to 8 (number of cores I have), it goes real blazing fast. But having partitions = nr of cores may be simplistic, since, a partition represents a block of data (64MB default). So one should go up the ladder until at least whole memory is being used.