3
votes

When I try to read parquet folder, that is currently being written with another spark streaming job, using an option "mergeSchema":"true", I get an Error:

java.io.IOException: Could not read footer for file
val df = spark
    .read
    .option("mergeSchema", "true")
    .parquet("path.parquet")

Without schema merging I can read the folder nicely but is it possible to read such a folder with schema merging regardless of possible side jobs updating it?

Full exception:

java.io.IOException: Could not read footer for file: FileStatus{path=hdfs://path.parquet/part-00000-20199ef6-4ff8-4ee0-93cc-79d47d2da37d-c000.snappy.parquet; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:551)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:538)
    at org.apache.spark.util.ThreadUtils$$anonfun$3$$anonfun$apply$1.apply(ThreadUtils.scala:287)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: hdfs://path.parquet/part-00000-20199ef6-4ff8-4ee0-93cc-79d47d2da37d-c000.snappy.parquet is not a Parquet file (too small length: 0)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
    ... 9 more
1
What do you need mergeSchema for? - Jacek Laskowski
@JacekLaskowski to combine parquet files with different field sets because of schema evolution - cringineer
What is the output format of the streaming job in "written with another spark streaming job,"? Can you include the entire exception? Is the streaming job up and running while you're trying to read the files in a batch job? - Jacek Laskowski
Just checked your use case with a sample streaming query and a batch query and all worked fine. How do you start the streaming query? How do you start the batch job? - Jacek Laskowski
It looks like Spark ignores incomplete files without schema merging, but with this option enabled it tries to use incomplete files somehow resulting with exception - cringineer

1 Answers

7
votes

Run following before creating your dataframe:

spark.sql("set spark.sql.files.ignoreCorruptFiles=true")

i.e. Enable this config - spark.sql.files.ignoreCorruptFiles

As stated here, If this config is true, the Spark jobs will continue to run when encountering corrupted or non-existing files and contents that have been read will still be returned. Also, this config is used by the merge schema flow.

It is available from Spark 2.1.1+