0
votes

In Spark, I have an RDD that contains millions of paths to local files (we have a shared file system, so they appear local). In Scala, how would I create an RDD that consists of all the lines in each of those files?

I tried doing something like this:

paths.flatMap(path => sc.textFile(path))

But that didn't work. I also tried something like this:

paths.flatMap(path => 
    scala.io.Source.fromInputStream(new java.io.FileInputStream(path)).getLines
)

That worked when working locally but didn't when running on multiple machines. I ended up with this error:

java.nio.charset.MalformedInputException: Input length = 1
    at java.nio.charset.CoderResult.throwException(CoderResult.java:277)

Any pointers would be appreciated

(Most solutions point so far involve passing the list of files to sc.textFile all at once, which is not possible since the list can be very large. A typical use case right now would yield 20M paths, which doesn't fit in a single Java String).

2

2 Answers

2
votes

This:

paths.flatMap(path => sc.textFile(path))

simply cannot compile not to mention work because RDDs are not TraversableOnce

Error you see when reading files directly (java.nio.charset.MalformedInputException) is not Spark related and is thrown when file has incorrect encoding. To quote MalformedInputException documentation:

Checked exception thrown when an input byte sequence is not legal for given charset, or an input character sequence is not a legal sixteen-bit Unicode sequence. You can solve it providing a codec for fromInputStream method:

def fromInputStream(is: InputStream)(implicit codec: Codec)

and using Codec.onMalformedInput with desired CodingErrorAction

See for example Source.fromInputStream exception handling during reading lines.

Moreover you should handle IO exceptions when you read data directly for example by wrapping reading block with Try.

Spark itself supports reading complete directory trees. There is no reason to pass individual files, only the list of top level directories and use correct settings of mapreduce.input.fileinputformat.input.dir.recursive configuration. It is also possible to pass multiple root directories as a comma-separated string:

sc.textFile("/foo,/bar")

You can also use wildcards to read arbitrary list of files and directories:

sc.textFile(Seq(
  "/foo/bar/*",
  "/bar/*/*.txt"
).mkString(","))

Finally reading large number of small files is inefficient due to way how input splits are computed. Instead of using textFiles you should consider reading with a subclass of the CombineFileInputFormat for example:

sc.hadoopFile(
  paths,
  classOf[CombineTextInputFormat],
  classOf[LongWritable], classOf[Text]
).map(_._2.toString)

Finally you can union multiple RDDs as suggested by @GameOfThrows but it shouldn't be done iteratively without checkpointing to avoid issues with long lineages. Use SparkContext.union instead and control the number of partitions.

2
votes

If they were in a directory, then it's probably better to read the whole directory

 sc.textFile("/path/to/directory/") 

which will merge all the files into a single RDD, look out for mem constraints. Alternatively you can try a map then reduce:

paths.map(path => sc.textFile(path)).reduce(_.union(_))

Or even better as zero323 suggested:

SparkContext.union(paths.map(...))