0
votes

I'm new to Spark and HBase but I need to link the two together, I tried the library spark-hbase-connector but with spark-submit it doesn't work even though no error is shown. I searched here and elsewhere for a similar problem or a tutorial but couldn't find one, so could anyone explain how to write to HBase from Spark streaming or recommend a tutorial or a book ? Thank you in advance

3
This question is off-topic on many levels. Please read on how to ask questions on SO and what question are in the scope of being answered here. - eliasah

3 Answers

1
votes

What finally worked was :

val hconf = HBaseConfiguration.create()
val hTable = new HTable(hconf, "mytab")
val thePut = new Put(Bytes.toBytes(row))
thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes("c1"), Bytes.toBytes(value)
hTable.put(thePut)
0
votes

Here is some sample code using Splice Machine (Open Source) to store data into HBase via Spark Streaming and Kafka...

https://github.com/splicemachine/splice-community-sample-code/tree/master/tutorial-kafka-spark-streaming

We fought through this as well and know it can be a bit daunting.

0
votes

Here is the relevant code...

        LOG.info("************ SparkStreamingKafka.processKafka start");

   // Create the spark application and set the name to MQTT
    SparkConf sparkConf = new SparkConf().setAppName("KAFKA");

    // Create the spark streaming context with a 'numSeconds' second batch size
    jssc = new JavaStreamingContext(sparkConf, Durations.seconds(numSeconds));
    jssc.checkpoint(checkpointDirectory);

    LOG.info("zookeeper:" + zookeeper);
    LOG.info("group:" + group);
    LOG.info("numThreads:" + numThreads);
    LOG.info("numSeconds:" + numSeconds);


    Map<String, Integer> topicMap = new HashMap<>();
    for (String topic: topics) {
        LOG.info("topic:" + topic);
      topicMap.put(topic, numThreads);
    }

    LOG.info("************ SparkStreamingKafka.processKafka about to read the MQTTUtils.createStream");
    //2. KafkaUtils to collect Kafka messages
    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper, group, topicMap);

    //Convert each tuple into a single string.  We want the second tuple
    JavaDStream<String> lines = messages.map(new TupleFunction());

    LOG.info("************ SparkStreamingKafka.processKafka about to do foreachRDD");
    //process the messages on the queue and save them to the database
    lines.foreachRDD(new SaveRDDWithVTI());


    LOG.info("************ SparkStreamingKafka.processKafka prior to context.strt");
    // Start the context
    jssc.start();
    jssc.awaitTermination();