I'm new to Spark and HBase but I need to link the two together, I tried the library spark-hbase-connector but with spark-submit it doesn't work even though no error is shown. I searched here and elsewhere for a similar problem or a tutorial but couldn't find one, so could anyone explain how to write to HBase from Spark streaming or recommend a tutorial or a book ? Thank you in advance
0
votes
3 Answers
1
votes
0
votes
0
votes
Here is the relevant code...
LOG.info("************ SparkStreamingKafka.processKafka start");
// Create the spark application and set the name to MQTT
SparkConf sparkConf = new SparkConf().setAppName("KAFKA");
// Create the spark streaming context with a 'numSeconds' second batch size
jssc = new JavaStreamingContext(sparkConf, Durations.seconds(numSeconds));
jssc.checkpoint(checkpointDirectory);
LOG.info("zookeeper:" + zookeeper);
LOG.info("group:" + group);
LOG.info("numThreads:" + numThreads);
LOG.info("numSeconds:" + numSeconds);
Map<String, Integer> topicMap = new HashMap<>();
for (String topic: topics) {
LOG.info("topic:" + topic);
topicMap.put(topic, numThreads);
}
LOG.info("************ SparkStreamingKafka.processKafka about to read the MQTTUtils.createStream");
//2. KafkaUtils to collect Kafka messages
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper, group, topicMap);
//Convert each tuple into a single string. We want the second tuple
JavaDStream<String> lines = messages.map(new TupleFunction());
LOG.info("************ SparkStreamingKafka.processKafka about to do foreachRDD");
//process the messages on the queue and save them to the database
lines.foreachRDD(new SaveRDDWithVTI());
LOG.info("************ SparkStreamingKafka.processKafka prior to context.strt");
// Start the context
jssc.start();
jssc.awaitTermination();