13
votes

After repartitioning a DataFrame in Spark 1.3.0 I get a .parquet exception when saving to Amazon's S3.

logsForDate
    .repartition(10)
    .saveAsParquetFile(destination) // <-- Exception here

The exception I receive is:

java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: COLUMN
at parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:137)
at parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:129)
at parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:173)
at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:152)
at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:635)
at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:649)
at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:649)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I would like to know what is the problem and how to solve it.

3
Do you get the error every time or just sometimes? Do you also get it for smaller files? Do you only get it on S3 or other file systems as well? Have you tried Apache Spark 1.3.1? Its release notes mention some Parquet-related fixes. - Daniel Darabos
I get the error all the time when working above a certain file size. I have only tried S3. I have tried 1.3.0.d. - Interfector
I am able to reproduce this error with Spark 1.3.1 on EMR, writing to S3. Using the old Parquet api (sqlContext.setConf("spark.sql.parquet.useDataSourceApi", "false")) does not help. Writing to HDFS works fine. - Eric Eijkelenboom
did you try using a bucket in us-west-1 region ? or using emfrs - Gaurav Shah
@Interfector we you able to solve this problem. I have the same issue - User12345

3 Answers

4
votes

I can actually reproduce this problem with Spark 1.3.1 on EMR, when saving to S3.

However, saving to HDFS works fine. You could save to HDFS first, and then use e.g. s3distcp to move the files to S3.

1
votes

I faced with this error when saveAsParquetFile into HDFS. It was because datanode socket write timeout, therefore I change it to a longer one in Hadoop Settings:

<property>
  <name>dfs.datanode.socket.write.timeout</name>
  <value>3000000</value>
</property>
<property>
  <name>dfs.socket.timeout</name>
  <value>3000000</value>
</property> 

Hope this helps if you could set S3 like this.

1
votes

Are you sure this is not due to SPARK-6351 ("Wrong FS" upon saving parquet to S3)? If it is, then it has nothing to do with repartitioning, and it has been fixed in spark-1.3.1. If however, like me, you are stuck with spark-1.3.0 because you are using CDH-5.4.0, I just figured out last night a way to get around it directly from the code (no config file change):

spark.hadoopConfiguration.set("fs.defaultFS", "s3n://mybucket")

After that, I could save parquet files to S3 without problem.

Note that there are several drawbacks to this, however. I think (didn't try) that it will then fail to write to another FS than S3 and perhaps also to another bucket. It might also force Spark to write temporary files to S3 rather than locally, but I haven't checked that either.