19
votes

It seems that SparkContext textFile expects only files to be present in the given directory location - it does not either

  • (a) recurse or
  • (b) even support directories (tries to read directories as files)

Any suggestion how to structure a recursion - potentially simpler than creating the recursive file list / descent logic manually?

Here is the use case: files under

/data/tables/my_table

I want to be able to read via an hdfs call all the files at all directory levels under that parent directory.

UPDATE

The sc.textFile() invokes the Hadoop FileInputFormat via the (subclass) TextInputFormat. Inside the logic does exist to do the recursive directory reading - i.e. first detecting if an entry were a directory and if so then descending:

<!-- language: java -->
     for (FileStatus globStat: matches) {
218          if (globStat.isDir()) {
219            for(FileStatus stat: fs.listStatus(globStat.getPath(),
220                inputFilter)) {
221              result.add(stat);
222            }          
223          } else {
224            result.add(globStat);
225          }
226        }

However when invoking sc.textFile there are errors on directory entries: "not a file". This behavior is confusing - given the proper support appears to be in place for handling directories.

2
@JustinPihony Thanks I am not using S3 - so it is unclear if that answer applies. - WestCoastProjects
Can you provide an example directory layout? Is the globbing syntax not sufficient? e.g. textFile(/path/*/*) - Nick Chammas
@NickChammas No the globbing does not work: it does not descend and also any directory encountered generates an error "not a file" - WestCoastProjects

2 Answers

40
votes

I was looking at an old version of FileInputFormat..

BEFORE setting the recursive config mapreduce.input.fileinputformat.input.dir.recursive

scala> sc.textFile("dev/*").count
     java.io.IOException: Not a file: file:/shared/sparkup/dev/audit-release/blank_maven_build

The default is null/not set which is evaluated as "false":

scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
res1: String = null

AFTER:

Now set the value :

sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")

Now retry the recursive operation:

scala>sc.textFile("dev/*/*").count

..
res5: Long = 3481

So it works.

Update added / for full recursion per comment by @Ben

2
votes

I have found that these parameters must be set in the following way:

.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")