1
votes

I have a dataframe and writing into S3 bucket target location. In the code Coalesce is using for loading data and getting SparkOutOfMemoryError.The current code which is having Coalesce been using multiple projects, And seen lot of solutions recommended repartition and it worked for me. Even it has zero records, coalesce is not working. Is there any other-way to resolve this issue without changing to repartition?

code:

empsql = 'Select * From Employee'
df = spark.sql(empsql) ##Spark is configured
df.coalesce(2).write.mode('overwrite').format("parquet").option("delimiter",'|').save(s3_path, header = True)

Error:

org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply at org.apache.spark.scheduler.ResultTask.runTask at org.apache.spark.scheduler.Task.run at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply at org.apache.spark.util.Utils$.tryWithSafeFinally at org.apache.spark.executor.Executor$TaskRunner.run at java.util.concurrent.ThreadPoolExecutor.runWorker at java.util.concurrent.ThreadPoolExecutor$Worker.run at java.lang.Thread.run Caused by: org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 44 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:) at org.apache.spark.memory.MemoryConsumer.allocatePage at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:241) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)

2
Can we see the Spark UI stages tab? Maybe that will give us more information.Carlos David Peña
you could verify your executor configuration and also check in spark UI if there is any data skew which could be causing this issue.Nikunj Kakadiya

2 Answers

0
votes

No sure if this will work for you but try doing this

df.coalesce(2,shuffle=true).write.mode('overwrite').format("parquet").option("delimiter",'|').save(s3_path, header = True)

shuflle =true will add a shuffle step.Partitions would execute in parallel. The behaviour is similar to using repartition

0
votes

I solve my issue by adding repartition(key) by well-distributed key before coalesce(). I think it helps Spark allow to send preordered data to the writer node and not to do sorting on writer executor.