0
votes

How can I perform word count of multiple files present in a directory using Apache Spark with Scala?

All the files have newline delimiter.

O/p should be:

file1.txt,5
file2.txt,6 ...

I tried using below way:

val rdd= spark.sparkContext.wholeTextFiles("file:///C:/Datasets/DataFiles/")
val cnt=rdd.map(m =>( (m._1,m._2),1)).reduceByKey((a,b)=> a+b)

O/p I'm getting:

((file:/C:/Datasets/DataFiles/file1.txt,apple
orange
bag
apple
orange),1)
((file:/C:/Datasets/DataFiles/file2.txt,car
bike
truck
car
bike
truck),1)

I tried sc.textFile() first, but didn't give me the filename. wholeTextFile() returns key-value pair, in which the key is the filename, but couldn't get the desired output.

1

1 Answers

0
votes

You are starting in the right track, but need to work out in your solution a bit more.

The method sparkContext.wholeTextFiles(...) gives you a (file, contents) pair, so when you reduce it by key you get (file, 1) because that's the amount of whole file contents that you have per pair-key.

In order to count the words of each file, you need to break the contents of each file into those words so you can count them.

Let's do it here, let's start reading the file directory:

val files: RDD[(String, String)] = spark.sparkContext.wholeTextFiles("file:///C:/Datasets/DataFiles/")

That gives one row per file, alongside the full file contents. Now let's break the file contents into individual items. Given the fact your files seem to have one word per line, this is really easy using line breaks:

val wordsPerFile: RDD[(String, Array[String])] = files.mapValues(_.split("\n"))

Now we just need to count the number of items that are present in each of those Array[String]:

val wordCountPerFile: RDD[(String, Int)] = wordsPerFile.mapValues(_.size)

And that's basically it. It's worth mentioning though the the word counting is not being distributed at all (it's just using an Array[String]) because you are loading the whole contents of your files at once.