0
votes

I have:

RDD1 with pairs of points that i would like to compare

(2,5), (3,7), ...

and RDD2 with each point's dimensions

(0,List(5,7)), (1,List(2,4)), ...

How could I take the dimensions of second rdd, in order to compare the pairs of the first rdd?

(both rdds are big and i could not collect them)
(join doesn't work for different rdd schema)
https://www.mdpi.com/1999-4893/12/8/166/htm#B28-algorithms-12-00166

1
Are you looking solution in Java or Scala ?QuickSilver
I am looking a solution only in Scala (spark)Thenia Mak
exactly what type operations are you looking to perform on those coordinates ? lets says if they are (x1,y1) from df1 and (x2,y2) what should be happening with it ? e.g. x3 = x1-x2 and y3 = y1-y2 , is that the output you are looking for ?QuickSilver
Each pair of RDD1 consists of ids of two different points. If i have (id1,id2) of RDD1 and ((id1,(x1,y1))) and ((id2,(x2,y2))) of RDD2, i have to perform a domination check at these two points id1 and id2Thenia Mak

1 Answers

0
votes

Added a sample for joining the rows hope it works for you. Also find the place holder where you can add/modify the code to add your logic

import org.apache.spark.sql.functions._

import scala.collection.mutable

object JoinRdds {

  def main(args: Array[String]): Unit = {
    val spark = Constant.getSparkSess

    import spark.implicits._
    var df1 = List((2,5),(3,7)).toDF("x","y")  // 1st Dataframe
    val df2 = List((0,List(5,7)), (1,List(2,4))).toDF("id", "coordinates")  // 2nd Dataframe

    df1 = df1.withColumn("id", monotonically_increasing_id())  // Add row num to 1st DF
//    df2.join(df1, df1("id") === df2("id"))    // perform inner join
//      .drop("id")  // drop the id column
//      .show(false)

    val rdd = df2.join(df1, df1("id") === df2("id")).rdd  // here's your RDD you can
    val resultCoordinates : Array[(Int, Int)] = rdd.map(row => { // Iterate the result row by row
      // you can do all sort of operations per row return any type here.
      val coordinates = row.getAs[mutable.WrappedArray[Int]]("coordinates")
      val x = row.getAs[Integer]("x")
      val y = row.getAs[Integer]("y")
      (coordinates(0) - x , coordinates(1) - y )
    }).collect() // the collect call on the output
    resultCoordinates.foreach(r => println(s"(${r._1},${r._2})")) // printing the output result
  }

}