0
votes

I am using nerdammer hbase spark connector and reading two hbase tables as RDD convert them to dataframe and run SQL to join them its working as expected .

One of the columns in one of the tables has JSON object I need to extract specific JSON attribute value in the final result how is this possible. If I have Json data in column D of ARDD like [{"foo":"bar","baz":"qux"}] I need to create new RDD or DF which will have value of "baz" only in this column so that finally when I join I get value of this attribute only.

 val ARDD = sc.hbaseTable[(Option[String], Option[String], Option[String], Option[String], Option[String],Option[String])](ATableName)
        .select("A","B","C","D","E").inColumnFamily("pCF")

        val BRDD = sc.hbaseTable[(Option[String],Option[String], Option[String], Option[String], Option[String], Option[String],Option[String])](BTableName)
        .select("A","B","C","D","E","F").inColumnFamily("tCF")


    val ADF = sqlContext.createDataFrame(ARDD).registerTempTable("aDF")
    val BDF = sqlContext.createDataFrame(BRDD).registerTempTable("bDF")

val resultset = sqlContext.sql("SELECT aDF._1, bDF._2, bDF._3, bDF._4, bDF._5, bDF._6, bDF._3, aDF._1, aDF._2, bDF._1 FROM aDF, bDFWHERE aDF._5 = bDF._7").collect()

val joinedResult = resultset.foreach(println)
  println("Count " + joinedResult)
1

1 Answers

1
votes

Created a UDF to achive this and created new column in my DF which has the parsed information

import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.functions.udf
def udfScoreToCategory=udf((t: String) => {
   compact((parse(t.toString,true) \ "myField"))})


val abc=  myDF.withColumn("_p", udfScoreToCategory(myDF("_4"))).registerTempTable("odDF")