50
votes

I have a directory of directories on HDFS, and I want to iterate over the directories. Is there any easy way to do this with Spark using the SparkContext object?

9
you mean 'iterate' like get the list of sub-directories and files within? or getting all files across all subdirectories? - maasg
Iterate as in list all the sub-directories. Each subdirectory contains a bunch of text files that I want to process in different ways. - Jon

9 Answers

56
votes

You can use org.apache.hadoop.fs.FileSystem. Specifically, FileSystem.listFiles([path], true)

And with Spark...

FileSystem.get(sc.hadoopConfiguration).listFiles(..., true)

Edit

It's worth noting that good practice is to get the FileSystem that is associated with the Path's scheme.

path.getFileSystem(sc.hadoopConfiguration).listFiles(path, true)
41
votes

Here's PySpark version if someone is interested:

    hadoop = sc._jvm.org.apache.hadoop

    fs = hadoop.fs.FileSystem
    conf = hadoop.conf.Configuration() 
    path = hadoop.fs.Path('/hivewarehouse/disc_mrt.db/unified_fact/')

    for f in fs.get(conf).listStatus(path):
        print(f.getPath(), f.getLen())

In this particular case I get list of all files that make up disc_mrt.unified_fact Hive table.

Other methods of FileStatus object, like getLen() to get file size are described here:

Class FileStatus

20
votes
import  org.apache.hadoop.fs.{FileSystem,Path}

FileSystem.get( sc.hadoopConfiguration ).listStatus( new Path("hdfs:///tmp")).foreach( x => println(x.getPath ))

This worked for me.

Spark version 1.5.0-cdh5.5.2

1
votes

this did the job for me

FileSystem.get(new URI("hdfs://HAservice:9000"), sc.hadoopConfiguration).listStatus( new Path("/tmp/")).foreach( x => println(x.getPath ))
1
votes

@Tagar didn't say how to connect remote hdfs, but this answer did:

URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration


fs = FileSystem.get(URI("hdfs://somehost:8020"), Configuration())

status = fs.listStatus(Path('/some_dir/yet_another_one_dir/'))

for fileStatus in status:
    print(fileStatus.getPath())
1
votes

I had some issues with other answers(like 'JavaObject' object is not iterable), but this code works for me

fs = self.spark_contex._jvm.org.apache.hadoop.fs.FileSystem.get(spark_contex._jsc.hadoopConfiguration())
i = fs.listFiles(spark_contex._jvm.org.apache.hadoop.fs.Path(path), False)
while i.hasNext():
  f = i.next()
  print(f.getPath())
1
votes

Scala FileSystem (Apache Hadoop Main 3.2.1 API)

    import org.apache.hadoop.fs.{FileSystem, Path}
    import scala.collection.mutable.ListBuffer

    
    val fileSystem : FileSystem = {
        val conf = new Configuration()
        conf.set( "fs.defaultFS", "hdfs://to_file_path" )
        FileSystem.get( conf )
    }
      
    val files = fileSystem.listFiles( new Path( path ), false )
    val filenames = ListBuffer[ String ]( )
    while ( files.hasNext ) filenames += files.next().getPath().toString()
    filenames.foreach(println(_))
0
votes

You can try with globStatus status as well

val listStatus = org.apache.hadoop.fs.FileSystem.get(new URI(url), sc.hadoopConfiguration).globStatus(new org.apache.hadoop.fs.Path(url))

      for (urlStatus <- listStatus) {
        println("urlStatus get Path:"+urlStatus.getPath())
}
0
votes

You can use below code to iterate recursivly through a parent HDFS directory, storing only sub-directories up to a third level. This is useful, if you need to list all directories that are created due to the partitioning of the data (in below code three columns were used for partitioning):

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

def rememberDirectories(fs: FileSystem, path: List[Path]): List[Path] = {
  val buff = new ListBuffer[LocatedFileStatus]()

  path.foreach(p => {
    val iter = fs.listLocatedStatus(p)
    while (iter.hasNext()) buff += iter.next()
  })

  buff.toList.filter(p => p.isDirectory).map(_.getPath)
}

@tailrec
def getRelevantDirs(fs: FileSystem, p: List[Path], counter: Int = 1): List[Path] = {
  val levelList = rememberDirectories(fs, p)
  if(counter == 3) levelList
  else getRelevantDirs(fs, levelList, counter + 1)
}