3
votes

I have a problem with Spark Scala converting an Iterable (CompactBuffer) to individual pairs. I want to create a new RDD with key-value pairs of the ones in the CompactBuffer.

It looks like this:

CompactBuffer(Person2, Person5)
CompactBuffer(Person2, Person5, Person7)
CompactBuffer(Person1, Person5, Person11)

The CompactBuffers could obtain more persons than just 3. Basically what I want is a new RDD that has the individual combinations of the CompactBuffer like this (I also want to avoid identical key-values):

Array[
<Person2, Person5>
<Person5, Person2>
<Person2, Person7>
<Person7, Person2>
<Person5, Person7>
<Person7, Person5>
<Person1, Person5>
<Person5, Person1>
<Person1, Person11>
<Person11, Person1>
<Person5, Person11>
<Person11, Person5>]

Can someone help me?

Thank you in advance

1
So you want all pairs of items from each CompactBuffer, but you don't want to repeat any pairs that came from earlier CompactBuffers?The Archetypal Paul
That's what I want indeedTooltje
That's going to not parallelize very well, since the output from any one CompactBuffer depends on the history of all previous ones. So it's going to have to run on one node? How many different pairs do you expect?The Archetypal Paul
It will be a lot. Maybe just skip the non-repeating part then. I can try to deal with that later.Tooltje
Actually, I am wrong. If you calculate all the pairs, then RDDs have a distinct method which I will assume is distributed. See my answerThe Archetypal Paul

1 Answers

3
votes

Here's something that produces the pairs (and removes repeated ones). I couldn't work out how to use CompactBuffer so it uses ArrayBuffer, since the source for CompactBuffer says it's a more efficient ArrayBuffer. You may need to convert your CompactBuffer in the flatMap to something that supports .combinations.

object sparkapp extends App {
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer


val data = List(
ArrayBuffer("Person2", "Person5"),
ArrayBuffer("Person2", "Person5", "Person7"),
ArrayBuffer("Person1", "Person5", "Person11"))

val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
val sc= new SparkContext(conf)


val dataRDD = sc.makeRDD(data, 1)
val pairs = dataRDD.flatMap(
             ab => ab.combinations(2)
                     .flatMap{case ArrayBuffer(x,y) => List((x, y),(y,x))}
            ).distinct

pairs.foreach (println _)

}

Output

(Person7,Person2)
(Person7,Person5)
(Person5,Person2)
(Person11,Person1)
(Person11,Person5)
(Person2,Person7)
(Person5,Person7)
(Person1,Person11)
(Person2,Person5)
(Person5,Person11)
(Person1,Person5)
(Person5,Person1)