3
votes

I am inserting to HBase using Spark but it's slow. For 60,000 records it takes 2-3mins. I have about 10 million records to save.

object WriteToHbase extends Serializable {
    def main(args: Array[String]) {
        val csvRows: RDD[Array[String] = ...
        val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
        val usersRDD = csvRows.map(row => {
            new UserTable(row(0), row(1), row(2), row(9), row(10), row(11))
        })
        processUsers(sc: SparkContext, usersRDD, dateFormatter)
    })
}

def processUsers(sc: SparkContext, usersRDD: RDD[UserTable], dateFormatter: DateTimeFormatter): Unit = {

    usersRDD.foreachPartition(part => {
        val conf = HBaseConfiguration.create()
        val table = new HTable(conf, tablename)

        part.foreach(userRow => {
            val id = userRow.id
            val name = userRow.name
            val date1 = dateFormatter.parseDateTime(userRow.date1)
            val hRow = new Put(Bytes.toBytes(id))
            hRow.add(cf, q, Bytes.toBytes(date1))
            hRow.add(cf, q, Bytes.toBytes(name))
            ...
            table.put(hRow)
        })
        table.flushCommits()
        table.close()
    })
}

I am using this in spark-submit:

--num-executors 2 --driver-memory 2G --executor-memory 2G --executor-cores 2 
3

3 Answers

3
votes

It's slow because the implementation doesn't leverage the proximity of the data; the piece of Spark RDD in a server may be transferred to a HBase RegionServer running on another server.

Currently there is no Spark's RRD operation to use HBase data store in efficient manner.

0
votes

There is a batch api in Htable, you can try to send put requests as 100-500 put packets.I think it can speed up you a little. It returns individual result for every operation, so you can check failed puts if you want.

public void batch(List<? extends Row> actions, Object[] results)

https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/HTable.html#batch%28java.util.List,%20java.lang.Object[]%29

0
votes

You have to look on the approach where you can distribute your incoming data in to the Spark Job. In your current approach of foreachPartition instead you have to look on Transformations like map, mapToPair as well. You need to evaluate your whole DAG lifecycle and where you can save more time.

After that based on the Parallelism achieved you can call saveAsNewAPIHadoopDataset Action of Spark to write inside HBase more fast and parallel. Like:

JavaPairRDD<ImmutableBytesWritable, Put> yourFinalRDD = yourRDD.<SparkTransformation>{()};    
yourFinalRDD.saveAsNewAPIHadoopDataset(yourHBaseConfiguration); 

Note: Where yourHBaseConfiguration will be a singleton and will be single object on an Executor node to share between the Tasks

Kindly let me know if this Pseudo-code doesn't work for you or find any difficulty on the same.