1
votes

I have a huge table in hbase, with potentially millions of rows. [HBase Table Structure][1] I am trying to access chunks of the table, using (STARTROW & ENDROW) and the sc.newAPIHadoopRDDfunction. I am trying to find a way to get the Column Qualifier names from the resulting RDD. As each row can have any number of columns and column qualifiers, I want to get the column families of each Row, by Rowkey. In short, I want to create a Dataframe in Spark which looks somewhat like this :

ROWKEY  COLUMN NAME                     VALUE
ROW1    ColumnFamily:ColumnQualifier1   Value="XX"
ROW1    ColumnFamily:ColumnQualifier2   Value="XX"
ROW1    ColumnFamily:ColumnQualifier3   Value="XX"
ROW1    ColumnFamily:ColumnQualifier4   Value="XX"
ROW1    ColumnFamily:ColumnQualifier5   Value="XX"
ROW1    ColumnFamily:ColumnQualifier6   Value="XX"
ROW2    ColumnFamily:ColumnQualifier1   Value="XX"
ROW2    ColumnFamily:ColumnQualifier2   Value="XX"
ROW2    ColumnFamily:ColumnQualifier3   Value="XX"
ROW2    ColumnFamily:ColumnQualifier4   Value="XX"
ROW3    ColumnFamily:ColumnQualifier1   Value="XX"
ROW4    ColumnFamily:ColumnQualifier2   Value="XX"

So, from the RDD returned by sc.newAPIHadoopRDD, I want to know of a way to access the column names. Once I have the column qualifier, I can get value for family : qualifier combination using the rdd.getValue(family,qualifier) function.

val kvRDD = sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable], classOf[Result])
val resultRDD = kvRDD.map(tuple => tuple._2)
val keyValueRDD = resultRDD.map(result => {
    var resultStrings: List[Object] = List()
    var navigablemap=result.getNoVersionMap()
    val vallist = navigablemap.values() 
    for (each <- vallist) {
        resultStrings = resultStrings ::: List(each)
    }
    resultStrings
})

But this is returning an rdd in which each row is encrypted. Any help with the scala code is greatly appreciated, Thanks

1
To the best of my knowledge, Hbase stores everything as ByteArray. Are you sure its not a bit Array that you are getting in return?Vikas Saxena

1 Answers

0
votes
val dataFrame = kvRDD.map(x => {
      val rowkey=Bytes.toString(x._2.getRow)
      val families=x._2.getNoVersionMap.keySet().iterator()
      var colums = scala.collection.mutable.Map[String, String]()
      while (families.hasNext){
        val family=families.next();
        val qualifiers=x._2.getFamilyMap(family).keySet().iterator()
        while(qualifiers.hasNext){
          val qualifier=qualifiers.next();
          colums+=((Bytes.toString(family)+":"+(Bytes.toString(qualifier))->Bytes.toString(x._2.getValue(family,qualifier))))
        }
      }
      (rowkey,colums)
    }).toDF()
    dataFrame.select($"_1".alias("ROWKEY"), functions.explode($"_2").as(List("COLUMN NAME","VALUE"))).show