7
votes

The purpose of this is in order to manipulate and save a copy of each data file in a second location in HDFS. I will be using

RddName.coalesce(1).saveAsTextFile(pathName)

to save the result to HDFS.

This is why I want to do each file separately even though I am sure the performance will not be as efficient. However, I have yet to determine how to store the list of CSV file paths into an array of strings and then loop through each one with a separate RDD.

Let us use the following anonymous example as the HDFS source locations:

/data/email/click/date=2015-01-01/sent_20150101.csv
/data/email/click/date=2015-01-02/sent_20150102.csv
/data/email/click/date=2015-01-03/sent_20150103.csv

I know how to list the file paths using Hadoop FS Shell:

HDFS DFS -ls /data/email/click/*/*.csv

I know how to create one RDD for all the data:

val sentRdd = sc.textFile( "/data/email/click/*/*.csv" )
3

3 Answers

10
votes

I haven't tested it thoroughly but something like this seems to work:

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.fs.{FileSystem, Path, LocatedFileStatus, RemoteIterator}
import java.net.URI

val path: String = ???

val hconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hconf)
val iter = hdfs.listFiles(new Path(path), false)

def listFiles(iter: RemoteIterator[LocatedFileStatus]) = {
  def go(iter: RemoteIterator[LocatedFileStatus], acc: List[URI]): List[URI] = {
    if (iter.hasNext) {
      val uri = iter.next.getPath.toUri
      go(iter, uri :: acc)
    } else {
      acc
    }
  }
  go(iter, List.empty[java.net.URI])
}

listFiles(iter).filter(_.toString.endsWith(".csv"))
1
votes

This is what ultimately worked for me:

import org.apache.hadoop.fs._
import org.apache.spark.deploy.SparkHadoopUtil
import java.net.URI

val hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfs_conf)
// source data in HDFS
val sourcePath = new Path("/<source_location>/<filename_pattern>")

hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
   val filePathName = fileStatus.getPath().toString()
   val fileName = fileStatus.getPath().getName()

   // < DO STUFF HERE>

} // end foreach loop
-1
votes

sc.wholeTextFiles(path) should help. It gives an rdd of (filepath, filecontent).