4
votes

I am using pyspark-2.4.0 and a large job keeps crashing with the following error message (either when saving to parquet or when trying to collect the result):

py4j.protocol.Py4JJavaError: An error occurred while calling o2495.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 184 in stage 290.0 failed 4 times, most recent failure: Lost task 184.3 in stage 290.0 (TID 17345, 53.62.154.250, executor 5): org.xerial.snappy.SnappyIOException: [EMPTY_INPUT] Cannot decompress empty stream at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:94) at org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:59) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:164) at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:698) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:696) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:696) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:820) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:875) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

My problem is that I do not know which operation is causing the issue. The error message does not give any indication about this and the stack trace does not contain any of my custom code.

Any ideas what could cause this or how can I find where exactly the job keeps failing?

1
Did you fix it or get the solution? - Kedar Prabhu
I added some additional checks to prevent empty dataframes to be written out. After that, the problem apparently did not surface again. Wouldn’t call it a real „solution“ though. - Matthias

1 Answers

4
votes

I came across a link when I searched online: http://mail-archives.apache.org/mod_mbox/spark-issues/201903.mbox/%3CJIRA.13223720.1553507879000.125409.1553586300107@Atlassian.JIRA%3E

The summary of it is:

Spark 2.4 use 1.1.7.x snappy-java, but its behavior is different from 1.1.2.x 
which is used in Spark 2.0.x. SnappyOutputStream in 1.1.2.x version always writes a snappy 
header whether or not to write a value, but SnappyOutputStream in 1.1.7.x don't generate 
header if u don't write value into it, so in spark 2.4 if RDD cache a empty value, 
memoryStore will not cache any bytes ( no snappy header ), then it will throw the empty 
error. 

Also, if you figure out a solution (other than downgrading Spark to 2.0v), do let us know here.