0
votes

I want to insert streaming data into hbase; this is my code :

val tableName = "streamingz"
val conf = HBaseConfiguration.create()
conf.addResource(new Path("file:///opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/etc/hbase/conf.dist/hbase-site.xml"))
conf.set(TableInputFormat.INPUT_TABLE, tableName)

val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
    print("-----------------------------------------------------------------------------------------------------------")
    val tableDesc = new HTableDescriptor(tableName)
    tableDesc.addFamily(new HColumnDescriptor("z1".getBytes()))
    tableDesc.addFamily(new HColumnDescriptor("z2".getBytes()))
    admin.createTable(tableDesc)
} else {
    print("Table already exists!!--------------------------------------------------------------------------------------")
}
val ssc = new StreamingContext(sc, Seconds(10))
val topicSet = Set("fluxAstellia")
val kafkaParams = Map[String, String]("metadata.broker.list" - > "10.32.201.90:9092")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val lines = stream.map(_._2).map(_.split(" ", -1)).foreachRDD(rdd => {
    if (!rdd.partitions.isEmpty) {
        val myTable = new HTable(conf, tableName)
        rdd.map(rec => {
            var put = new Put(rec._1.getBytes)
            put.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(rec._2))
            myTable.put(put)
        }).saveAsNewAPIHadoopDataset(conf)
        myTable.flushCommits()
    } else {
        println("rdd is empty")
    }

})


ssc.start()
ssc.awaitTermination()

}
}

I got this error:

:66: error: value _1 is not a member of Array[String]
       var put = new Put(rec._1.getBytes)

I'm beginner so how I can't fix this error, and I have a question:

where exactly create the table; outside the streaming process or inside ?

Thank you

1

1 Answers

0
votes

You error is basically on line var put = new Put(rec._1.getBytes) You can call _n only on a Map(_1 for key and _2 for value) or a Tuple.
rec is a String Array you got by splitting the string in the stream by space characters. If you were after first element, you'd write it as var put = new Put(rec(0).getBytes). Likewise in the next line you'd write it as put.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(rec(1)))