2
votes

I have two RDD's of the form:

data_wo_header: RDD[String], data_test_wo_header: RDD[String]
scala> data_wo_header.first
res2: String = 1,2,3.5,1112486027 
scala> data_test_wo_header.first
res2: String = 1,2 

RDD2 is smaller than RDD 1. I am trying to filter RDD1 by removing the elements whose regEx matches with RDD2.

The 1,2 in the above example represent UserID,MovID. Since it's present in the test I want the new RDD such that it's removed from RDD1.

I have asked a similar ques but it is requiring to do unnecessary split of RDD. I am trying to do something of this sort but it's not working:

  def create_training(data_wo_header: RDD[String], data_test_wo_header: RDD[String]): List[String] = {

    var ratings_train = new ListBuffer[String]()
    data_wo_header.foreach(x => {
      data_test_wo_header.foreach(y => {
        if (x.indexOf(y) == 0) {
          ratings_train += x
        }
      })
    })
    val ratings_train_list = ratings_train.toList
    return ratings_train_list
  }

How should I do a regex match and filter based on it.

1

1 Answers

2
votes

You can use broadcast variable to share state of rdd2 and then filter rdd1 based on broadcasted variable of rdd2. I replicate your code and this works for me

def create_training(data_wo_header: RDD[String], data_test_wo_header: RDD[String]): List[String] = {

   val rdd2array = sparkSession.sparkContext.broadcast(data_test_wo_header.collect())
   val training_set = data_wo_header.filter{
       case(x) => rdd2array.value.filter(y => x.matches(y)).length == 0
   }
   training_set.collect().toList
}

Also with scala and spark I recommend you if it is possible to avoid foreach and use more functional paradigm with map,flatMap and filter functions