1
votes

What is the best way to compare received data in Spark Streaming to existing data in HBase?

We receive data from kafka as DStream, and before writing it down to HBase we must scan HBase for data based on received keys from kafka, do some calculation (based on new vs old data per key) and then write down to HBase.

So if I receive record (key, value_new), I must get from HBase (key, value_old), so I can compare value_new vs value_old.

So the logic would be:

Dstream from Kafka -> Query HBase by DStream keys -> Some calculations -> Write to HBase

My "naïve" approach was to use Phoenix Spark Connector to read and left join to new data based on key as a way to filter out keys not in the current micro-batch. So I would get a DF with (key, value_new, value_old) and from here I can compare inside partition.

JavaInputDStream<ConsumerRecord<String, String>> kafkaDStream = KafkaUtils.createDirectStream(...);

// use foreachRDD in order to use Phoenix DF API
kafkaDStream.foreachRDD((rdd, time) -> {
        // Get the singleton instance of SparkSession
        SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());

        JavaPairRDD<String, String> keyValueRdd = rdd.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

        // TO SLOW FROM HERE
        Dataset<Row> oldDataDF = spark
                .read()
                .format("org.apache.phoenix.spark")
                .option("table", PHOENIX_TABLE)
                .option("zkUrl", PHOENIX_ZK)
                .load()
                .withColumnRenamed("JSON", "JSON_OLD")
                .withColumnRenamed("KEY_ROW", "KEY_OLD");

        Dataset<Row> newDF = toPhoenixTableDF(spark, keyValueRdd); //just a helper method to get RDD to DF (see note bellow)

        Dataset<Row> newAndOld = newDF.join(oldDataDF, oldDataDF.col("KEY_OLD").equalTo(newDF.col("KEY_ROW")), "left");

        /// do some calcs based on new vs old values and then write to Hbase ...

});

PROBLEM: getting data from HBase based on a list of keys from received DStream RDD using the above approach is too slow for streaming.

What can be a performant way to do so?


Side note: Method toPhoenixTableDF is just a helper to transform the received RDD to DF:

    private static Dataset<Row> toPhoenixTableDF(SparkSession spark, JavaPairRDD<String, String> keyValueRdd) {
        JavaRDD<phoenixTableRecord> tmp = keyValueRdd.map(x -> {
            phoenixTableRecord record = new phoenixTableRecord();
            record.setKEY_ROW(x._1);
            record.setJSON(x._2);
            return record;
        });

        return spark.createDataFrame(tmp, phoenixTableRecord.class);

    }
1
This question is too broad in my opinion. Try to focus on what exactly is not working and explain why. Asking about general architecture will lead to rather opinion based answers.mike
Anyway, I can say that we use a Spark Streaming (DStreams) application which gets a key from Kafka, looks the key up in HBase and writes the processed message again to Hive. This is working perfectly fine in production. Let me know where you are having concerns and I might be able to help.mike
Hey, @mike, thanks for the tip. Edited to make it more clear and focused in the problem, rather then generic architecture. My problem is to do something similar to what you did. Can you share more considerations and code, please?YFl
You mentioned "getting data from HBase based on a list of keys from received DStream RDD using the above approach is too slow for streaming.". However, in our case we are exactly doing this and based on our requirements it is not too slow.mike
using plain HBase clientmike

1 Answers

0
votes

The solution is to use the spark hbase connector for batch get and put.

You can find the source code here with good examples. https://github.com/apache/hbase-connectors/tree/master/spark As well as in HBase documentation (spark session).

This library uses plain Java/Scala Hbase api, so you have control over operations, but manages for you the connection pool through an hbaseContext object broadcasted to executors, which is really great. It provides simple wrappers for Hbase operations, but, if needed, we can just use its foreach/mapPartition and gain control over logic, while having access to a managed connection.