I am new to spark ,Could someone help me find a way to combine two rdds to create a final rdd as per the below logic in scala preferably without using sqlcontext(dataframes) -
RDD1=column1,column2,column3 having 362825 records
RDD2=column2_distinct(same as from RDD1 but containing distinct values),column4 having 2621 records
Final RDD=column1,column2,column3,column4
Example-
RDD1 =
userid | progid | Rating
a 001 5
b 001 3
b 002 4
c 003 2
RDD2=
progid(distinct) | id
001 1
002 2
003 3
Final RDD=
userid | progid | id | rating
a 001 1 5
b 001 1 3
b 002 2 4
c 003 3 2
code
val rawRdd1 = pairrdd1.map(x => x._1.split(",")(0) + "," + x._1.split(",")(1) + "," + x._2) //362825 records
val rawRdd2 = pairrdd2.map(x => x._1 + "," + x._2) //2621 records
val schemaString1 = "userid programid rating"
val schemaString2 = "programid id"
val fields1 = schemaString1.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val fields2 = schemaString2.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema1 = StructType(fields1)
val schema2 = StructType(fields2)
val rowRDD1 = rawRdd1.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1), attributes(2)))
val rowRDD2 = rawRdd2.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1)))
val DF1 = sparkSession.createDataFrame(rowRDD1, schema1)
val DF2 = sparkSession.createDataFrame(rowRDD2, schema2)
DF1.createOrReplaceTempView("df1")
DF2.createOrReplaceTempView("df2")
val resultDf = DF1.join(DF2, Seq("programid"))
val DF3 = sparkSession.sql("""SELECT df1.userid, df1.programid, df2.id, df1.rating FROM df1 JOIN df2 on df1.programid == df2.programid""")
println(DF1.count()) //362825 records
println(DF2.count()) //2621 records
println(DF3.count()) //only 297 records
expecting same number of records as DF1 with a new column attached from DF2 (id) having corresponding value of programid from DF2`