0
votes

I have a spark job which joins 2 datasets, performs some transformations and reduces the data to give output. The input size for now is pretty small (200MB datasets each), but after join, as you can see in DAG, the job is stuck and never proceeds with stage-4. I tried waiting for hours and it gave OOM and showed failed tasks for stage-4.

Spark UI DAG Task Summary

  1. Why doesnt spark show stage-4(data transformation stage) as active after stage-3(join stage)? Is it stuck in the shuffle between stage-3 & 4?
  2. What can I do to improve performance of my spark job? I tried increasing shuffle partitions and still same result.

Job code:


joinedDataset.groupBy("group_field")
    .agg(collect_set("name").as("names")).select("names").as[List[String]]
      .rdd.                                     //converting to rdd since I need to use reduceByKey                                  
      .flatMap(entry => generatePairs(entry))   // this line generates pairs of words out of input text, so data size increases here
      .map(pair => ((pair._1, pair._2), 1))
      .reduceByKey(_+_)
      .sortBy(entry => entry._2, ascending = false)
      .coalesce(1)

FYI My cluster has 3 worker nodes with 16 cores and 100GB RAM, 3 executors with 16 cores(1:1 ratio with machines for simplicity) and 64GB memory allocate.

UPDATE: Turns out the data generated in my job is pretty huge. I did some optimisations(strategically reduced input data and removed some duplicated strings from processing), now the job finishes within 3 hours. Input for stage 4 is 200MB and output is 200GB per se. It uses parallelism properly bu it sucks at shuffle. My shuffle spill during this job was 1825 GB(memory) and 181 GB(disk). Can someone help me with reducing shuffle spill and duration of the job? Thanks.

1
why coalesce(1)?thebluephantom
just because I want output in 1 file, no other reason.Saurabh
works fine with me, but a different spark version.thebluephantom
May I know what did you try? Did your application hang before stage4?Saurabh
ran somthing - small vol - on databricks. i do not believe the answer. can u run with smaller volume as testthebluephantom

1 Answers

0
votes

Try an initial sort on executor and then reduce + sort them

joinedDataset.groupBy("group_field")
    .agg(collect_set("name").as("names")).select("names").as[List[String]]
      .rdd.                                     //converting to rdd since I need to use reduceByKey                                  
      .flatMap(entry => generatePairs(entry))   // this line generates pairs of words out of input text, so data size increases here
      .map(pair => ((pair._1, pair._2), 1))
      .sortBy(entry => entry._2, ascending = false) // Do a initial sort on executors
      .reduceByKey(_+_)
      .sortBy(entry => entry._2, ascending = false) 
      .coalesce(1)