3
votes

I am writing a project to receive data from Kafka and write into Hbase table. Because I want to know differential of records, I need to get record with the same rowkey in Hbase first and then do a subtraction with received record and finally save new records into HBase table.

At start, I tried to use newAPIHadoop to get data from hbase. Here is my attempt:

val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", zkQuorum)
conf.set("hbase.master", masterAddr)
conf.set("hbase.zookeeper.property.clientPort", portNum)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName)

val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf, 
      classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

By this way, I am able to get the values of records with specific column family and column name ONLY ONCE. By saying only once, I mean every time I start my spark-streaming application, this snippet of codes will be executed and I can get a value, but it will not executed anymore. BecauseI want to read my records from HBase with cf and column every time I receive a record from Kafka, this does not work for me.

To solve this, I move the logic to foreachRDD(), but unfortunately sparkContext seems not serializable. I got error like task is not serialzable.

Finally, I found there is another way to read data from hbase by using hbase.clinet HTable. So this is my final work:

def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = {
    val conf = HBaseConfiguration.create()
    conf.set("zookeeper.znode.parent", "/hbase-secure")
    conf.set("hbase.zookeeper.quorum", "xxxxxx")
    conf.set("hbase.master", "xxxx")
    conf.set("hbase.zookeeper.property.clientPort", "xxx")
    conf.set(TableInputFormat.INPUT_TABLE, "xx")
    conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx")

    val testTable = new HTable(conf, "testTable")
    val scan = new Scan
    scan.addColumn("cf1".getBytes, "test".getBytes)
    val rs = testTable.getScanner(scan)

    var r = rs.next()
    val res = new StringBuilder
    while(r != null){
      val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes))

      res.append(tmp)
      r= rs.next()
    }
val res = res.toString

//do the following manipulations and return object (ImmutableBytesWritable, Put)
         ..............................
         .......................
          }

In the main method I use above method in foreachRDD and save into HBase by using method saveAsNewAPIHadoopDataset

streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration))

This works fine for me now, but I have questions about this process:

In this way, I guess, for every partition of RDD, a connection to HBase would be created. I am wondering if it is possible to scale up my app. Say if I have more than 1000 records in 1 second, it looks like 1000 connections would be set up in my spark Streaming.

Is this correct way to do data reading from hbase? What is the best practice to read data from HBase in sparkStreaming? Or spark streaming is not supposed to read any data, it is just designed to write stream data into DB.

Thanks in advance.

2

2 Answers

3
votes

After some learning, I create a configuration for each partition of RDD. Check design pattern for foreachRDD in at Spark Streaming official website. Actually Configuration is not a connection, so I don't know how to get a connection from a existing connection pool to get and put record for Hbase.

0
votes

foreachRDD executes on individual executors jvm process. At least you can get the singleton instance of conf(means having null check before using existing set conf of jvm process or new conf) in transferToHBasePut method. So this will reduce the number Hbase connections to number of executors spawned in your Spark cluster.

Hope this helps...