2
votes

I'm a new Spark user and I want to save my streaming data into multiple Hbase tables. I didn't have problems when I wanted to save my data in a single one, but with multiple I haven't been able to work with.

I've tried to create multiple HTable but then I've noticed that this class only used to communicate with a single HBase table.

Is there any way to do this?

This is where I try to create multiple Htables (of course doesn't work, but it's the idea)

 //HBASE Tables
val tableFull = "table1"
val tableCategoricalFiltered = "table2"

// Add local HBase conf
val conf1 = HBaseConfiguration.create()
val conf2 = HBaseConfiguration.create()

conf1.set(TableInputFormat.INPUT_TABLE, tableFull)
conf2.set(TableInputFormat.INPUT_TABLE, tableCategoricalFiltered)

//Opening Tables
val tableInputFeatures = new HTable(conf1, tableFull)
val tableCategoricalFilteredFeatures = new HTable(conf2, tableCategoricalFiltered)

And here is where I try to use them (with one HTable works though)

events.foreachRDD { event =>

    var j = 0
    event.foreach { feature =>

            if ( j <= 49 ) {
                    println("Feature " + j + " : " + featuresDic(j))
                    println(feature)

                    val p_full = new Put(new String("stream " + row_full).getBytes())
                    p_full.add(featuresDic(j).getBytes(), "1".getBytes(), new String(feature).getBytes())
                    tableInputFeatures.put(p_full)

                    if ( j != 26 || j != 27 || j != 28 || j != 29 ) {

                            val p_cat = new Put(new String("stream " + row_categorical).getBytes())
                            p_cat.add(featuresDic(j).getBytes(), "1".getBytes(), new String(feature).getBytes())
                            tableCategoricalFilteredFeatures.put(p_cat)
                    }else{
                            j = 0
                            row_full = row_full + 1

                            println("Feature " + j + " : " + featuresDic(j))
                            println(feature)

                            val p_full = new Put(new String("stream " + row_full).getBytes())
                            p_full.add(featuresDic(j).getBytes(), "1".getBytes(), new String(feature).getBytes())
                            tableInputFeatures.put(p_full)

                            val p_cat = new Put(new String("stream " + row_categorical).getBytes())
                            p_cat.add(featuresDic(j).getBytes(), "1".getBytes(), new String(feature).getBytes())
                            tableCategoricalFilteredFeatures.put(p_cat)
                    }

                    j  = j + 1
            }
    }
1
Your question would likely get more attention if you provided a code sample.Eric Hauenstein
Thank you for your tip Eric, I've just provided my code :)Relam

1 Answers

0
votes

There's one way I confirmed that works well, use hbase-rdd library. https://github.com/unicredit/hbase-rdd

It's easy to use. Please refer https://github.com/unicredit/hbase-rdd#writing-to-hbase to see usage.

You can try MultiTableOutputFormat as I confirmed that works well with traditional mapreduce. I didn't use it from Spark yet.