I've got some Spark code that reads two files from HDFS (a header file and a body file), reduces the RDD[String] to a single partition, then writes the result as a compressed file using the GZip codec:
spark.sparkContext.textFile("path_to_header.txt,path_to_body.txt")
.coalesce(1)
.saveAsTextFile("output_path", classOf[GzipCodec])
This works 100% as expected. We're now being asked to support zip compression for Windows users who are unable to natively decompress *.gzip files. Obviously, zip format isn't natively supported, so I'm attempting to roll my own compression codec.
I'm running into a "ZipException: no current ZIP entry" exception when running the code however:
Exception occured while exporting org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 2 times, most recent failure: Lost task 0.1 in stage 16.0 (TID 675, xxxxxxx.xxxxx.xxx, executor 16): java.util.zip.ZipException: no current ZIP entry
at java.util.zip.ZipOutputStream.write(Unknown Source)
at io.ZipCompressorStream.write(ZipCompressorStream.java:23)
at java.io.DataOutputStream.write(Unknown Source)
at org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.writeObject(TextOutputFormat.java:81)
at org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.write(TextOutputFormat.java:102)
at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:95)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1205)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1348)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
I've created a ZipCodec class that extends DefaultCodec:
public class ZipCodec extends DefaultCodec {
@Override
public CompressionOutputStream createOutputStream(final OutputStream out, final Compressor compressor) throws IOException {
return new ZipCompressorStream(new ZipOutputStream(out));
}
As well as a ZipCompressorStream which extends CompressorStream:
public class ZipCompressorStream extends CompressorStream {
public ZipCompressorStream(final ZipOutputStream out) {
super(out);
}
@Override
public void write(final int b) throws IOException {
out.write(b);
}
@Override
public void write(final byte[] data, final int offset, final int length) throws IOException {
out.write(data, offset, length);
}
We're currently using Spark 1.6.0 and Hadoop 2.6.0-cdh5.8.2
Any thoughts at all?
Thanks in advance!