I am writing a project to receive data from Kafka and write into Hbase table. Because I want to know differential of records, I need to get record with the same rowkey in Hbase first and then do a subtraction with received record and finally save new records into HBase table.
At start, I tried to use newAPIHadoop
to get data from hbase. Here is my attempt:
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", zkQuorum)
conf.set("hbase.master", masterAddr)
conf.set("hbase.zookeeper.property.clientPort", portNum)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName)
val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
By this way, I am able to get the values of records with specific column family and column name ONLY ONCE. By saying only once, I mean every time I start my spark-streaming application, this snippet of codes will be executed and I can get a value, but it will not executed anymore. BecauseI want to read my records from HBase with cf and column every time I receive a record from Kafka, this does not work for me.
To solve this, I move the logic to foreachRDD()
, but unfortunately sparkContext seems not serializable. I got error like task is not serialzable
.
Finally, I found there is another way to read data from hbase by using hbase.clinet HTable. So this is my final work:
def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = {
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set("hbase.zookeeper.quorum", "xxxxxx")
conf.set("hbase.master", "xxxx")
conf.set("hbase.zookeeper.property.clientPort", "xxx")
conf.set(TableInputFormat.INPUT_TABLE, "xx")
conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx")
val testTable = new HTable(conf, "testTable")
val scan = new Scan
scan.addColumn("cf1".getBytes, "test".getBytes)
val rs = testTable.getScanner(scan)
var r = rs.next()
val res = new StringBuilder
while(r != null){
val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes))
res.append(tmp)
r= rs.next()
}
val res = res.toString
//do the following manipulations and return object (ImmutableBytesWritable, Put)
..............................
.......................
}
In the main method I use above method in foreachRDD and save into HBase by using method saveAsNewAPIHadoopDataset
streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration))
This works fine for me now, but I have questions about this process:
In this way, I guess, for every partition of RDD, a connection to HBase would be created. I am wondering if it is possible to scale up my app. Say if I have more than 1000 records in 1 second, it looks like 1000 connections would be set up in my spark Streaming.
Is this correct way to do data reading from hbase? What is the best practice to read data from HBase in sparkStreaming? Or spark streaming is not supposed to read any data, it is just designed to write stream data into DB.
Thanks in advance.