0
votes

We have a text files of 100K records each and we need to read the file line by line and insert it's value into hbase. The file is '|' delimited.

Sample textFile example:

    SLNO|Name|City|Pincode
    1|ABC|Pune|400104
    2|BMN|Delhi|100065

Each column will have different column family. We are trying to implement this in Spark-Scala using HBase Bulk load. We came across this link suggesting bulk load : http://www.openkb.info/2015/01/how-to-use-scala-on-spark-to-load-data.html

With the below syntax for inserting into single column family.

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

// Generate 10 sample data:
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), 
"c1".getBytes(), "value_xxx".getBytes() )
    (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})

// Directly bulk load to Hbase/MapRDB tables.
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], 
classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())   

Can anyone advice on the bulk load insertion for multi-column family.

1

1 Answers

0
votes

Do have a look at rdd.saveAsNewAPIHadoopDataset, to insert the data into the hbase table.

def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("sparkToHive").enableHiveSupport().getOrCreate()
    import spark.implicits._

    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum", "ip's")
    config.set("hbase.zookeeper.property.clientPort","2181")
    config.set(TableInputFormat.INPUT_TABLE, "tableName")

    val newAPIJobConfiguration1 = Job.getInstance(config)
    newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "tableName")
    newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val df: DataFrame  = Seq(("foo", "1", "foo1"), ("bar", "2", "bar1")).toDF("key", "value1", "value2")

    val hbasePuts= df.rdd.map((row: Row) => {
      val  put = new Put(Bytes.toBytes(row.getString(0)))
      put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("value1"), Bytes.toBytes(row.getString(1)))
      put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("value2"), Bytes.toBytes(row.getString(2)))
      (new ImmutableBytesWritable(), put)
    })

    hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration())
    }

Ref : https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/