2
votes

i have a use case where I'm using kafka streaming to listen to a topic and counting all the words and their number of occurrences. i want to store the count of words in HBase everytime i create an RDD from the DStream

Here's the code i'm using to read the topic, which works just fine and gives me an rdd of String, Long

val broker = "localhost:9092"
val zk ="localhost:2181"
val topic = "sparktest"

val sparkConf = new SparkConf().setAppName("KafkaHBaseWordCount").setMaster("local[2]")
sparkConf.set( "spark.driver.allowMultipleContexts" , "true")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sparkConf, Seconds(10))

val kafkaConf = Map("metadata.broker.list" -> broker,    "zookeeper.connect" -> zk,"group.id" -> "kafka-spark-streaming-example", "zookeeper.connection.timeout.ms" -> "1000")

val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc, kafkaConf, Map(topic -> 1),
StorageLevel.MEMORY_ONLY_SER).map(_._2)

val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)

i would now like to update the word count in hbase, let's say if my hbase table already has a few entries as

ROW        COLUMN+CELL                                                                                                                                            
 hi        column=word:count, timestamp=1442685920109, value=\x00\x00\x00\x00\x00\x00\x00\x04                                                                     
 hello     column=word:count, timestamp=1442685220641, value=\x00\x00\x00\x00\x00\x00\x00\x01                                                                     
 where     column=word:count, timestamp=1442685920261, value=\x00\x00\x00\x00\x00\x00\x00\x01 

and i've received new words on the stream , and the RDD now holds an additional

hi,2
hello,5

which would get result in new counts in Hbase, 'hi' -> 6 and 'hello' ->5

i've gotten this to work with the following code,

wordCounts.foreachRDD ( rdd => {

      val conf = HBaseConfiguration.create()
      conf.set(TableOutputFormat.OUTPUT_TABLE, "stream_count")
      conf.set("hbase.zookeeper.quorum", "localhost:2181")
      conf.set("hbase.master", "localhost:60000");
      conf.set("hbase.rootdir", "file:///tmp/hbase")

      val hConf = HBaseConfiguration.create()
      val hTable = new HTable(hConf, "stream_count")
      rdd.collect().foreach(record => {
          val increment = new Increment(Bytes.toBytes(record._1))
          increment.addColumn(Bytes.toBytes("word"), Bytes.toBytes("count"), record._2)
          hTable.increment(increment)
      }) 
    })

Is there a better way to do this? I tried looking at cloudera's sparkOnHbase, which has a bulkIncrement but i was not able to make that work. I'm fairly new to big data/spark, any pointers would be appreciated.

1

1 Answers

1
votes

I have a similar usecase and tried similar logic as yours. But this doesnt perform well. If I dont set the spark.default.parallelism, it uses default 2 executors to execute it. Even if I set it, its not as fast as "saveAsNewHadoopAPIDataset", but this api(when TableOutputFormat is used), doesn't support Increment(only supports Put and Delete).

How are your performance metrics on this and how did you achieve parallelism on this.