0
votes

I am fairly new to spark-scala so please don't mind if this is a beginner question.

I have a directory test which contains two files, input1.txt and input2.txt. Now, lets say i create a RDD called inputRDD using

val inputRDD = sc.wholeTextFiles("/home/hduser/test")

which includes both the files into the pair RDD (inputRDD).

based on my understanding, inputRDD contains file name as the key and contents as the value something like this

(input1.txt,contents of input1.txt)
(input2.txt,contents of input2.txt)

Now, lets say I have to perform a join on both the files this way(which are in the same RDD) based on the first column.

contents of input1.txt
----------------------
1 a
1 b
2 c
2 d

contents of input2.txt
----------------------
1 e
2 f
3 g

How can i do that?

2
Are you expecting more text files in your test directory? If yes, do you want to join them too? If yes, how?Ramesh Maharjan
@RameshMaharjan for now I have only two and I am not quite sure how to perform a join in this case because they are in the same RDD. It would have been direct if they were in different RDDs.Rahul Kumar
See my answer below :) I guess thats what you desireRamesh Maharjan
@RameshMaharjan Thank you :) But, is there a way to do something similar when the file names are not known? because lets say in the real world scenario when there are multiple small sized files, then giving exact file names would get messy!Rahul Kumar
updated my answer :) please checkRamesh Maharjan

2 Answers

0
votes

You need to first split your content, then do a reduceByKey to format your join. Something like below:

val outputRDD = inputRDD.mapPartitions(iter => {
  iter.map(path_content => {
    // split string content
    val splittedStr = path_content._2.split(" ")
    // outputs (1, a) (1, b) (2, c)
    (splittedStr(0), splittedStr(1)) 
  })
}).reduceByKey(_ + _) // this outputs (1, abe)
0
votes

If you have only two files in your test directory and if the filenames are known then you can separate the texts of two files into two rdds and use join as below

val rdd1 = inputRDD.filter(tuple => tuple._1.contains("input1.txt"))
  .flatMap(tuple => tuple._2.split("\n"))
  .map(line => line.split(" "))
  .map(array => (array(0), array(1)))

val rdd2 = inputRDD.filter(tuple => tuple._1.contains("input2.txt"))
  .flatMap(tuple => tuple._2.split("\n"))
  .map(line => line.split(" "))
  .map(array => (array(0), array(1)))


rdd1.join(rdd2).foreach(println)

You should have output as

(2,(c,f))
(2,(d,f))
(1,(a,e))
(1,(b,e))

I hope this is what you desire

Updated

If there are two files in test directory whose names are unknown then you can avoid wholeTextFile api and use textFile api to read them as separate rdds and join them as above. But for that you will have to write a function to list the files.

import java.io.File

def getListOfFiles(dir: String):List[File] = {
  val d = new File(dir)
  if (d.exists && d.isDirectory) {
    d.listFiles.filter(_.isFile).toList
  } else {
    List[File]()
  }
}

val fileList = getListOfFiles("/home/hduser/test")

val rdd1 = sc.textFile(fileList(0).getPath)
  .map(line => line.split(" "))
  .map(array => (array(0), array(1)))

val rdd2 = sc.textFile(fileList(1).getPath)
  .map(line => line.split(" "))
  .map(array => (array(0), array(1)))

rdd1.join(rdd2).foreach(println)