4
votes

I'm reading data coming from a Kafka (100.000 line per second) using Structured Spark Streaming, and i'm trying to insert all the data in HBase.

I'm in Cloudera Hadoop 2.6 and I'm using Spark 2.3

I tried something like I've seen here.

eventhubs.writeStream
 .foreach(new MyHBaseWriter[Row])
 .option("checkpointLocation", checkpointDir)
 .start()
 .awaitTermination()

MyHBaseWriter looks like this :

class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
  override def toPut(record: Row): Put = {
    override val tableName: String = "hbase-table-name"

    override def toPut(record: Row): Put = {
        // Get Json
        val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
        val key = data.getOrElse(Map())("key")+ ""
        val val = data.getOrElse(Map())("val")+ ""

        val p = new Put(Bytes.toBytes(key))
        //Add columns ... 
        p.addColumn(Bytes.toBytes(columnFamaliyName),Bytes.toBytes(columnName), Bytes.toBytes(val))

        p
     }
    }

And the HBaseForeachWriter class looks like this :

trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
  val tableName: String

  def pool: Option[ExecutorService] = None

  def user: Option[User] = None

  private var hTable: Table = _
  private var connection: Connection = _


  override def open(partitionId: Long, version: Long): Boolean = {
    connection = createConnection()
    hTable = getHTable(connection)
    true
  }

  def createConnection(): Connection = {
    // I create HBase Connection Here
  }

  def getHTable(connection: Connection): Table = {
    connection.getTable(TableName.valueOf(Variables.getTableName()))
  }

  override def process(record: RECORD): Unit = {
    val put = toPut(record)
    hTable.put(put)
  }

  override def close(errorOrNull: Throwable): Unit = {
    hTable.close()
    connection.close()
  }

  def toPut(record: RECORD): Put
}

So here I'm doing a put line by line, even if I allow 20 executors and 4 cores for each, I don't have the data inserted immediatly in HBase. So what I need to do is a bulk load ut I'm struggled because all what I find in the internet is to realize it with RDDs and Map/Reduce.

1

1 Answers

5
votes

What I understand is slow rate of record ingestion in to hbase. I have few suggestions to you.

1) hbase.client.write.buffer .
the below property may help you.

hbase.client.write.buffer

Description Default size of the BufferedMutator write buffer in bytes. A bigger buffer takes more memory — on both the client and server side since server instantiates the passed write buffer to process it — but a larger buffer size reduces the number of RPCs made. For an estimate of server-side memory-used, evaluate hbase.client.write.buffer * hbase.regionserver.handler.count

Default 2097152 (around 2 mb )

I prefer foreachBatch see spark docs (its kind of foreachPartition in spark core) rather foreach

Also in your hbase writer extends ForeachWriter

open method intialize array list of put in process add the put to the arraylist of puts in close table.put(listofputs); and then reset the arraylist once you updated the table...

what it does basically your buffer size mentioned above is filled with 2 mb then it will flush in to hbase table. till then records wont go to hbase table.

you can increase that to 10mb and so.... In this way number of RPCs will be reduced. and huge chunk of data will be flushed and will be in hbase table.

when write buffer is filled up and a flushCommits in to hbase table is triggered.

Example code : in my answer

2) switch off WAL you can switch off WAL(write ahead log - Danger is no recovery) but it will speed up writes... if dont want to recover the data.

Note : if you are using solr or cloudera search on hbase tables you should not turn it off since Solr will work on WAL. if you switch it off then, Solr indexing wont work.. this is one common mistake many of us does.

How to swtich off : https://hbase.apache.org/1.1/apidocs/org/apache/hadoop/hbase/client/Put.html#setWriteToWAL(boolean)

as I mentioned list of puts is good way... this is the old way (foreachPartition with list of puts) of doing before structured streaming example is like below .. where foreachPartition operates for each partition not every row.

def writeHbase(mydataframe: DataFrame) = {
      val columnFamilyName: String = "c"
      mydataframe.foreachPartition(rows => {
        val puts = new util.ArrayList[ Put ]
        rows.foreach(row => {
          val key = row.getAs[ String ]("rowKey")
          val p = new Put(Bytes.toBytes(key))
          val columnV = row.getAs[ Double ]("x")
          val columnT = row.getAs[ Long ]("y")
          p.addColumn(
            Bytes.toBytes(columnFamilyName),
            Bytes.toBytes("x"),
            Bytes.toBytes(columnX)
          )
          p.addColumn(
            Bytes.toBytes(columnFamilyName),
            Bytes.toBytes("y"),
            Bytes.toBytes(columnY)
          )
          puts.add(p)
        })
        HBaseUtil.putRows(hbaseZookeeperQuorum, hbaseTableName, puts)
      })
    }

To sumup :

What I feel is we need to understand the psycology of spark and hbase to make then effective pair.