i have a use case where I'm using kafka streaming to listen to a topic and counting all the words and their number of occurrences. i want to store the count of words in HBase everytime i create an RDD from the DStream
Here's the code i'm using to read the topic, which works just fine and gives me an rdd of String, Long
val broker = "localhost:9092"
val zk ="localhost:2181"
val topic = "sparktest"
val sparkConf = new SparkConf().setAppName("KafkaHBaseWordCount").setMaster("local[2]")
sparkConf.set( "spark.driver.allowMultipleContexts" , "true")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaConf = Map("metadata.broker.list" -> broker, "zookeeper.connect" -> zk,"group.id" -> "kafka-spark-streaming-example", "zookeeper.connection.timeout.ms" -> "1000")
val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc, kafkaConf, Map(topic -> 1),
StorageLevel.MEMORY_ONLY_SER).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
i would now like to update the word count in hbase, let's say if my hbase table already has a few entries as
ROW COLUMN+CELL
hi column=word:count, timestamp=1442685920109, value=\x00\x00\x00\x00\x00\x00\x00\x04
hello column=word:count, timestamp=1442685220641, value=\x00\x00\x00\x00\x00\x00\x00\x01
where column=word:count, timestamp=1442685920261, value=\x00\x00\x00\x00\x00\x00\x00\x01
and i've received new words on the stream , and the RDD now holds an additional
hi,2
hello,5
which would get result in new counts in Hbase, 'hi' -> 6 and 'hello' ->5
i've gotten this to work with the following code,
wordCounts.foreachRDD ( rdd => {
val conf = HBaseConfiguration.create()
conf.set(TableOutputFormat.OUTPUT_TABLE, "stream_count")
conf.set("hbase.zookeeper.quorum", "localhost:2181")
conf.set("hbase.master", "localhost:60000");
conf.set("hbase.rootdir", "file:///tmp/hbase")
val hConf = HBaseConfiguration.create()
val hTable = new HTable(hConf, "stream_count")
rdd.collect().foreach(record => {
val increment = new Increment(Bytes.toBytes(record._1))
increment.addColumn(Bytes.toBytes("word"), Bytes.toBytes("count"), record._2)
hTable.increment(increment)
})
})
Is there a better way to do this? I tried looking at cloudera's sparkOnHbase, which has a bulkIncrement but i was not able to make that work. I'm fairly new to big data/spark, any pointers would be appreciated.