0
votes

I have an RDD that has the signature

org.apache.spark.rdd.RDD[java.io.ByteArrayOutputStream]

In this RDD, each row has its own partition.

This ByteArrayOutputStream is zip output. I am applying some processing on the data in each partition and i want to export the processed data from each partition as a single zip file. What is the best way to export each Row in the final RDD as one file per row on hdfs?

If you are interested in knowing how I ended up with such an Rdd.

val npyData = transformedTopData.select("tokenIDF", "topLevelId").rdd.repartition(2).mapPartitions(x => {
      val vectors = for {
        row <- x
      } yield {
        row.getAs[Vector](0)
      }
      Seq(ml2npyCSR(vectors.toSeq).zipOut)
    }.iterator)

EDIT: Count works perfectly fine

scala> npyData.count()
res9: Long = 2 
2
Have you tried to count it? I doubt the RDD is gonna work. - Jacek Laskowski
@JacekLaskowski Count works fine. updated the output in question - vumaasha
What is ml2npyCSR.zipOut doing? - Yuval Itzchakov
Right. It will work but when you try to access the "records" of type java.io.ByteArrayOutputStream it may not. - Jacek Laskowski
@YuvalItzchakov it's taking all the records in the partition, creates a csr matrix in npz format. In simple we can assume it is similar to zipping the rows with in each partition - vumaasha

2 Answers

0
votes

Spark has very little support for file system operations. You'll need to Hadoop FileSystem API to create individual files

// This method is needed as Hadoop conf object is not serializable
def createFileStream(pathStr:String) = {
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;

    val hadoopconf = new Configuration();
    val fs = FileSystem.get(hadoopconf);
    val outFileStream = fs.create(new Path(pathStr));
    outFileStream
}

// Method writes to individual files. 
// Needs a unique id along with object for output file naming
def writeToFile( x:(Char, Long) ) : Unit = {
    val (dataStream, id) = x
    val output_dir = "/tmp/del_a/"
    val outFileStream = createFileStream(output_dir+id)
    dataStream.writeTo(outFileStream)
    outFileStream.close()
}


// zipWithIndex used for creating unique id for each item in rdd
npyData.zipWithIndex().foreach(writeToFile)

Reference:
Hadoop FileSystem example
ByteArrayOutputStream.writeTo(java.io.OutputStream)

0
votes

I figured out that I should represent my data as PairRDD and implement a custom FileOutputFormat. I looked in to the implementation of SequenceFileOutputFormat for inspiration and managed to write my own version based on that.

My custom FileOutputFormat is available here