0
votes

In Spark Scala framework, I have an RDD, rdd1, in which each element represents a single element of a matrix A:

val rdd1 = dist.map{case (((x,y),z,v)) => ((x,y),v)}

x represents the row, y represents the column and v represents the value in matrix A.

I also have another RDD, rdd2, in the form of RDD[index, Array[(x, y)]] where the array in each element represents the set of elements of the matrix A, which are stored in rdd1, needed for the specific index represented in that element.

Now what I need to do, is get the values of the matrix A elements for each index, preserving all data including index, (x,y) and v. What would be a good approach in doing this ?

1

1 Answers

1
votes

If I understand correctly, your question boils down to:

val valuesRdd = sc.parallelize(Seq(
//((x, y), v)
  ((0, 0), 5.5),            
  ((1, 0), 7.7)
))

val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)])
  (123, Array((0, 0), (1, 0))) 
))

And you want to merge these RDDs to get all values (index, (x, y), v), in this case, (123, (0,0), 5.5) and (123, (1,0), 7.7) ?

You can definitely do this using join, since both RDDs have a common column (x, y), but since one of them actually has an Array[(x, y)] you'd have to explode that into a set of rows first:

val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}}
// Each row exploded into multiple rows (index, (x, y))

val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)}
// Each row keyed by the coordinates (x, y)

val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)}
// Each row keyed by the coordinates (x, y)

// Because we have common keys, we can join!
val joined = keyedIndices.join(keyedValues)