0
votes

I have a HDFS location and there is a zip file inside that location

HDFS location /development/staging/b8baf3f4-abce-11eb-8592-0242ac110032/records.zip

 scala> val loc = "/development/staging/b8baf3f4-abce-11eb-8592-0242ac110032/"
 loc: String = "/development/staging/b8baf3f4-abce-11eb-8592-0242ac110032/"

 scala> val rdd = sc.textFile(loc)
 rdd: org.apache.spark.rdd.RDD[String] = /development/staging/b8baf3f4-abce-11eb-8592-0242ac110032/ MapPartitionsRDD[1] at textFile at <console>:26

  scala> rdd.take(2)
  res0: Array[String] = Array(PK????????]R�R��*�????�??? ???2972120.dat�S�r�0? 
   ��*�0����?t?�]T�Ж??����
 `�6ط�kU;P�M�� rSO�;G��p��?��?�Z1^3@�^�� ��F��ٕb�?~,ٖ 
 �u6�D��'�@�??��L*�Gp?�kcL�7!r�p1�1e�� a*.{? 
   �.;��������s�(�)�, ?�=�9U<"*!?5��?;�?�?�مd{h} 
  ��gG���� �?�Z)

but it produces output differently

Can you help on how do i read a file inside a zip file using spark RDD There is only one file inside my zip file

1
Does this answer your question? How to open/stream .zip files through Spark?Partha Deb
Looking for a solution in scala codeSurender Raja

1 Answers

0
votes

Are you looking for something like this :

import java.io.{ IOException, FileOutputStream, FileInputStream, File }
import java.util.zip.{ ZipEntry, ZipInputStream }
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext

//Unzip the file and copy the internal contents outside in new location
object Unzip extends App {

  val INPUT_ZIP_FILE: String = "src/resources/my-zip.zip";
val OUTPUT_FOLDER: String = "src/resources/my-zip";

def unZipIt(zipFile: String, outputFolder: String): Unit = {

  val buffer = new Array[Byte](1024)

  try {

    //output directory
    val folder = new File(OUTPUT_FOLDER);
    if (!folder.exists()) {
      folder.mkdir();
    }

    //zip file content
    val zis: ZipInputStream = new ZipInputStream(new FileInputStream(zipFile));
    //get the zipped file list entry
    var ze: ZipEntry = zis.getNextEntry();

    while (ze != null) {

      val fileName = ze.getName();
      val newFile = new File(outputFolder + File.separator + fileName);

      System.out.println("file unzip : " + newFile.getAbsoluteFile());

      //create folders
      new File(newFile.getParent()).mkdirs();

      val fos = new FileOutputStream(newFile);

      var len: Int = zis.read(buffer);

      while (len > 0) {

        fos.write(buffer, 0, len)
        len = zis.read(buffer)
      }

      fos.close()
      ze = zis.getNextEntry()
    }

    zis.closeEntry()
    zis.close()

  } catch {
    case e: IOException => println("exception caught: " + e.getMessage)
  }

}

Unzip.unZipIt(INPUT_ZIP_FILE, OUTPUT_FOLDER)
val sac = new SparkContext("local[*]", " first Program");
val sqlc = new SQLContext(sac);
val rdd = sac.textFile("src/resources/my-zip/sample.txt")
rdd.take(1).foreach(println)
/*val rddFromFile = sqlc.sparkContext.textFile("src/resources/my-zip/sample.txt")
println(rddFromFile.getClass)

println("##Get data Using collect")
rddFromFile.collect().foreach(f=>{
  println(f)
})*/

}

Not sure if this achieves what you want to do, but may be could help a bit!