2
votes

I'm doing a real time pipeline where I connect Spark Streaming with HBase. For the sake of this process, I have to execute a filter in a HBase table, secifically a prefix filter, since I want to match the records where the key starts with a certain string.

The table I'm filtering is called "hm_notificaciones". I can connect successfully to Hbase shell and scan the table from the command line. Running the following command:

scan "hm_notificaciones"

I get the following records:

ROW COLUMN+CELL

 46948854-20180307              column=info_oferta:id_oferta, timestamp=1520459448795, value=123456

 46948854-20180312170423        column=info_oferta:id_establecimiento, timestamp=1520892403770, value=9999

 46948854-20180312170423        column=info_oferta:id_oferta, timestamp=1520892390858, value=123445

 46948854-20180312170536        column=info_oferta:id_establecimiento, timestamp=1520892422044, value=9239

 46948854-20180312170536        column=info_oferta:id_oferta, timestamp=1520892435173, value=4432

 46948854-20180313110824        column=info_oferta:id_establecimiento, timestamp=1520957374921, value=9990

 46948854-20180313110824        column=info_oferta:id_oferta, timestamp=1520957362458, value=12313

I've been tying to run a prefix filter using the Hbase API. I'm writing some Scala code to connect to the API and make the filter. The following code compiles and executes, however it returns an empty result:

def scanTable( table_name:String, family: String, search_key: String )= {
  val conf: Configuration = HBaseConfiguration.create()
  val connection: Connection = ConnectionFactory.createConnection(conf)

  // This is a test to verify if I can connect to HBase API. 
  //This statements work and print all the table names in HBase
  val admin = connection.getAdmin

  println("Listing all tablenames")
  val list_table_names = admin.listTableNames()
  list_table_names.foreach(println)

  val table: Table = connection.getTable( TableName.valueOf(table_name) )
  //val htable = new HTable(conf, tableName)

  var colValueMap: Map[String, String] = Map()
  var keyColValueMap: Map[String, Map[String, String]] = Map()
  val prefix = Bytes.toBytes(search_key)
  val scan = new Scan(prefix)
  scan.addFamily(Bytes.toBytes(family))
  val prefix_filter = new PrefixFilter(prefix)
  scan.setFilter(prefix_filter)
  val scanner = table.getScanner(scan)

  for( row <- scanner){
    val content = row.getNoVersionMap
    for( entry <- content.entrySet ){
      for( sub_entry <- entry.getValue.entrySet){
        colValueMap += (Bytes.toString( sub_entry.getKey) -> Bytes.toString(sub_entry.getValue) )
      }
      keyColValueMap += (Bytes.toString(row.getRow) -> colValueMap )
    }
  }

  //this doesn't execute 
  for( ( k, v) <- colValueMap) {
    printf( "key: %s", "value: %s\n", k, v )
  }

  //this never executes since scanner is null (or empty)
  for (result <- scanner) {
    for (cell <- result.rawCells) {
      println("Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength))
    }
  }

  scanner.close
  table.close
  connection.close

}

I've tried two approaches to print/get the data: composing a Map and iterating over the ResultScanner. However, it seems that my filter is not working since it's returning a null/empty set.

Do you know if there is an alternative way to execute a prefix filter on Hbase?

The code I'm using to test the above code is the following:

user_key = "46948854-20181303144609"
scanTable("hm_notificaciones", "info_oferta", user_key)
1
Maybe I'm misunderstanding, but the key 46948854-20181303144609 isn't in the data returned by scan "hm_notificaciones". Is that just a subset of the returned data?Ben Watson
Yeah, that is what I am thinking. try with - user_key = "46948854"surajz

1 Answers

1
votes

The second loop, will not enter, because you have already iterated the scanner on previous step.

  for (result <- scanner) {
    for (cell <- result.rawCells) {
      println("Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength))
    }
  }

And use keyColValueMap to print. It worked for me, check you prefix filter again.

  for( ( k, v) <- colValueMap) {
    printf( "key: %s", "value: %s\n", k, v )
  }