6
votes

I get a warning when using an RDD in a for comprension, and I'm not sure if it's something I'm doing wrong. If I do this:

val sc = new SparkContext(...)

val anRDD = sc.parallelize(List(
  ("a", List(1, 2, 3)), 
  ("b", List(4), 
  ("c", List(5, 6))
)

for {
  (someString, listOfInts) <- anRDD
  someInt <- listOfInts
} yield (someString, someInt)

Then I get this output:

 warning: `withFilter' method does not yet exist on org.apache.spark.rdd.RDD[(String, List[Int])], using `filter' method instead
  (s, li) <- rl

But it does still successfully return a FlatMappedRDD[(String, Int)]. Am I doing something wrong? Or is it safe to ignore this warning?

Update: I would also accept as an answer how the for-comprehension converts these operations to map/flatMap/filter calls, since I didn't think there'd be any filter or withFilter calls required. I assumed it would be equivalent to something similar to this:

anRDD.flatMap(tuple => tuple.map(someInt => (tuple._1, someInt)))

But this doesn't include any filter or withFilter calls, which seems to be the source of the warning.

Oh, I'm using Spark 1.2.0, Scala 2.10.4, and this is all within the REPL.

2

2 Answers

1
votes

First, I am no expert, but have done some digging and here is what I have found:

I compiled the code using -print (since JavaDecompiler was failing for some reason), which will print out the program with all Scala-specific features removed. There, I saw:

test.this.anRDD().filter({
    (new anonymous class anonfun$1(): Function1)
  }).flatMap({
    (new anonymous class anonfun$2(): Function1)
  }, ClassTag.apply(classOf[scala.Tuple2]));

You will notice the filter...so, I checked on the anonfun$1:

public final boolean apply(Tuple2<String, List<Object>> check$ifrefutable$1)
  {
    Tuple2 localTuple2 = check$ifrefutable$1;
    boolean bool;
    if (localTuple2 != null) {
      bool = true;
    } else {
      bool = false;
    }
    return bool;
  }

So, if you put all of this together, it seems that the filter is happening in the comprehension because it is filtering out anything that is NOT a Tuple2.

And, the preference is to use withFilter instead of filter (not sure why atm). You can see that by decompiling a regular list instead of an RDD

object test {
  val regList = List(
  ("a", List(1, 2, 3)), 
  ("b", List(4)),
  ("c", List(5, 6))
  )

val foo = for {
  (someString, listOfInts) <- regList
  someInt <- listOfInts
} yield (someString, someInt)
}

Which decompiles to:

test.this.regList().withFilter({
  (new anonymous class anonfun$1(): Function1)
}).flatMap({
  (new anonymous class anonfun$2(): Function1)
}, immutable.this.List.canBuildFrom()).$asInstanceOf[List]();

So, it is the same thing, except it uses withFilter where it can

-2
votes

Call collect() in the RDD before sending it to the comprehension.

val collectedList = anRDD.collect
for {
  (someString, listOfInts) <- collectedList
  someInt <- listOfInts
} yield (someString, someInt)