I have a spark job which joins 2 datasets, performs some transformations and reduces the data to give output. The input size for now is pretty small (200MB datasets each), but after join, as you can see in DAG, the job is stuck and never proceeds with stage-4. I tried waiting for hours and it gave OOM and showed failed tasks for stage-4.
- Why doesnt spark show stage-4(data transformation stage) as active after stage-3(join stage)? Is it stuck in the shuffle between stage-3 & 4?
- What can I do to improve performance of my spark job? I tried increasing shuffle partitions and still same result.
Job code:
joinedDataset.groupBy("group_field")
.agg(collect_set("name").as("names")).select("names").as[List[String]]
.rdd. //converting to rdd since I need to use reduceByKey
.flatMap(entry => generatePairs(entry)) // this line generates pairs of words out of input text, so data size increases here
.map(pair => ((pair._1, pair._2), 1))
.reduceByKey(_+_)
.sortBy(entry => entry._2, ascending = false)
.coalesce(1)
FYI My cluster has 3 worker nodes with 16 cores and 100GB RAM, 3 executors with 16 cores(1:1 ratio with machines for simplicity) and 64GB memory allocate.
UPDATE: Turns out the data generated in my job is pretty huge. I did some optimisations(strategically reduced input data and removed some duplicated strings from processing), now the job finishes within 3 hours. Input for stage 4 is 200MB and output is 200GB per se. It uses parallelism properly bu it sucks at shuffle. My shuffle spill during this job was 1825 GB(memory) and 181 GB(disk). Can someone help me with reducing shuffle spill and duration of the job? Thanks.