I'm doing a real time pipeline where I connect Spark Streaming with HBase. For the sake of this process, I have to execute a filter in a HBase table, secifically a prefix filter, since I want to match the records where the key starts with a certain string.
The table I'm filtering is called "hm_notificaciones". I can connect successfully to Hbase shell and scan the table from the command line. Running the following command:
scan "hm_notificaciones"
I get the following records:
ROW COLUMN+CELL
46948854-20180307 column=info_oferta:id_oferta, timestamp=1520459448795, value=123456
46948854-20180312170423 column=info_oferta:id_establecimiento, timestamp=1520892403770, value=9999
46948854-20180312170423 column=info_oferta:id_oferta, timestamp=1520892390858, value=123445
46948854-20180312170536 column=info_oferta:id_establecimiento, timestamp=1520892422044, value=9239
46948854-20180312170536 column=info_oferta:id_oferta, timestamp=1520892435173, value=4432
46948854-20180313110824 column=info_oferta:id_establecimiento, timestamp=1520957374921, value=9990
46948854-20180313110824 column=info_oferta:id_oferta, timestamp=1520957362458, value=12313
I've been tying to run a prefix filter using the Hbase API. I'm writing some Scala code to connect to the API and make the filter. The following code compiles and executes, however it returns an empty result:
def scanTable( table_name:String, family: String, search_key: String )= {
val conf: Configuration = HBaseConfiguration.create()
val connection: Connection = ConnectionFactory.createConnection(conf)
// This is a test to verify if I can connect to HBase API.
//This statements work and print all the table names in HBase
val admin = connection.getAdmin
println("Listing all tablenames")
val list_table_names = admin.listTableNames()
list_table_names.foreach(println)
val table: Table = connection.getTable( TableName.valueOf(table_name) )
//val htable = new HTable(conf, tableName)
var colValueMap: Map[String, String] = Map()
var keyColValueMap: Map[String, Map[String, String]] = Map()
val prefix = Bytes.toBytes(search_key)
val scan = new Scan(prefix)
scan.addFamily(Bytes.toBytes(family))
val prefix_filter = new PrefixFilter(prefix)
scan.setFilter(prefix_filter)
val scanner = table.getScanner(scan)
for( row <- scanner){
val content = row.getNoVersionMap
for( entry <- content.entrySet ){
for( sub_entry <- entry.getValue.entrySet){
colValueMap += (Bytes.toString( sub_entry.getKey) -> Bytes.toString(sub_entry.getValue) )
}
keyColValueMap += (Bytes.toString(row.getRow) -> colValueMap )
}
}
//this doesn't execute
for( ( k, v) <- colValueMap) {
printf( "key: %s", "value: %s\n", k, v )
}
//this never executes since scanner is null (or empty)
for (result <- scanner) {
for (cell <- result.rawCells) {
println("Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength))
}
}
scanner.close
table.close
connection.close
}
I've tried two approaches to print/get the data: composing a Map and iterating over the ResultScanner. However, it seems that my filter is not working since it's returning a null/empty set.
Do you know if there is an alternative way to execute a prefix filter on Hbase?
The code I'm using to test the above code is the following:
user_key = "46948854-20181303144609"
scanTable("hm_notificaciones", "info_oferta", user_key)
46948854-20181303144609
isn't in the data returned byscan "hm_notificaciones"
. Is that just a subset of the returned data? – Ben Watson