13
votes

I have zip files that I would like to open 'through' Spark. I can open .gzip file no problem because of Hadoops native Codec support, but am unable to do so with .zip files.

Is there an easy way to read a zip file in your Spark code? I've also searched for zip codec implementations to add to the CompressionCodecFactory, but am unsuccessful so far.

7

7 Answers

22
votes

There was no solution with python code and I recently had to read zips in pyspark. And, while searching how to do that I came across this question. So, hopefully this'll help others.

import zipfile
import io

def zip_extract(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return dict(zip(files, [file_obj.open(file).read() for file in files]))


zips = sc.binaryFiles("hdfs:/Testing/*.zip")
files_data = zips.map(zip_extract).collect()

In the above code I returned a dictionary with filename in the zip as a key and the text data in each file as the value. you can change it however you want to suit your purposes.

5
votes

Please try the code below:

using API sparkContext.newAPIHadoopRDD(
    hadoopConf,
    InputFormat.class,
    ImmutableBytesWritable.class, Result.class)
4
votes

@user3591785 pointed me in the correct direction, so I marked his answer as correct.

For a bit more detail, I was able to search for ZipFileInputFormat Hadoop, and came across this link: http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/

Taking the ZipFileInputFormat and its helper ZipfileRecordReader class, I was able to get Spark to perfectly open and read the zip file.

    rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());

The result was a map with one element. The file name as key, and the content as the value, so I needed to transform this into a JavaPairRdd. I'm sure you could probably replace Text with BytesWritable if you want, and replace the ArrayList with something else, but my goal was to first get something running.

JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {

    @Override
    public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
        List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();

        InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
        BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));

        String line;

        while ((line = br.readLine()) != null) {

        Tuple2 newTuple = new Tuple2(line.split("\\t")[0],line);
            newList.add(newTuple);
        }
        return newList;
    }
});
4
votes

I've had a similar issue and I've solved with the following code

sparkContext.binaryFiles("/pathToZipFiles/*")
.flatMap { case (zipFilePath, zipContent) =>

        val zipInputStream = new ZipInputStream(zipContent.open())

        Stream.continually(zipInputStream.getNextEntry)
        .takeWhile(_ != null)
        .flatMap { zipEntry => ??? }
    }
4
votes

This answer only collects the previous knowledge and I share my experience.

ZipFileInputFormat

I tried following @Tinku and @JeffLL answers, and use imported ZipFileInputFormat together with sc.newAPIHadoopFile API. But this did not work for me. And I do not know how would I put com-cotdp-hadoop lib on my production cluster. I am not responsible for the setup.

ZipInputStream

@Tiago Palma gave a good advice, but he did not finish his answer and I struggled quite some time to actually get the decompressed output.

By the time I was able to do so, I had to prepare all the theoretical aspects, which you can find in my answer: https://stackoverflow.com/a/45958182/1549135

But the missing part of the mentioned answer is reading the ZipEntry:

import java.util.zip.ZipInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;   

sc.binaryFiles(path, minPartitions)
      .flatMap { case (name: String, content: PortableDataStream) =>
        val zis = new ZipInputStream(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}
 
2
votes
using API sparkContext.newAPIHadoopRDD(hadoopConf, InputFormat.class, ImmutableBytesWritable.class, Result.class) 

File name should be pass using conf

conf=( new Job().getConfiguration())
conf.set(PROPERTY_NAME from your input formatter,"Zip file address")
sparkContext.newAPIHadoopRDD(conf, ZipFileInputFormat.class, Text.class, Text.class)

Please Find PROPERTY_NAME from your input formatter for set path

0
votes

Try:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.text("yourGzFile.gz")