1
votes

When using spark streaming and the built in HDFS support, I have encountered the follwing inconvenience:

dStream.saveAsTextFiles produces many subdirectories in HDFS. rdd.saveAsTextFile also creates subdirectories for each set of parts.

I am looking for a method which puts all of the parts in the same path:

myHdfsPath/Prefix_time-part0XXX

instead of

myHdfsPath/Prefix_time/part0XXX

That way I can later iterate over these files more easily by scanning a single HDFS directory.

2

2 Answers

0
votes

You can process the result of the saveAsTextFile and merge the files generated:

import org.apache.hadoop.fs._

def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
    val sourceFile = hdfsServer + "/tmp/" 
    rdd.saveAsTextFile(sourceFile)
    val dstPath = hdfsServer + "/final/" 
    merge(sourceFile, dstPath, fileName)
  }

  def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
    val hadoopConfig = new Configuration()
    val hdfs = FileSystem.get(hadoopConfig)
    val destinationPath = new Path(dstPath)
    if (!hdfs.exists(destinationPath)) {
      hdfs.mkdirs(destinationPath)
    }
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
  }

Another way is to collect the rdd and call HDFS Java API to write a single file. But that means that your rdd is small enough to be collected and as you know, collect the rdd is inefficient.

I hope this can help.

0
votes

You need to call a repartition(1) on your RDD before calling the write command.

Now if you are using Spark Streaming, you first have to collect() all the data you want to write, then repartition, then write it.