0
votes

Working with Spark Structured Streaming.

I am working on a code where I need to do a lot of lookups on data. Lookups are very complex and just don't translate too well to joins.

e.g. look up field A in Table B and fetch a value, if found lookup that values in another table. if not found lookup some other value C in table D and then so on and so forth.

I managed to write these lookups using HBase and it works fine, functionally. I wrote udfs for each of these lookups e.g. a very simple one might be:

val someColFunc= udf( (code:String) =>
        {
            val value = HbaseObject.table.getRow("lookupTable", code, "cf", "value1")
            if (value != null)
                Bytes.toString(value)
            else
                null
        }
    )

Since java hbase client is non serializable. I am passing Hbase object like this

object HbaseObject {
 val table = new HbaseUtilities(zkUrl)
}

HbaseUtilities is a class I wrote to simplify lookups. It just creates a java HBase client and provides a wrapper for the kind of get commands I need.

This is rendering my code too slow, which too, is alright. What's puzzling me, is that increasing or decreasing the number of executors or cores is having no effect on the speed of my code. be it 1 executor or 30 it's running at the exact same rate. Which makes me believe there is lack of parallelism. So all my workers must be sharing the same Hbase object. Is their a way I can instantiate one such object on each worker before they start executing? I have already tried using lazy val, it's not having any effect

I have even tried creating a sharedSingleton as shown here https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/, which solved some problems for me but not the loss of parallelism.

I know there might be better ways to solve the problem and all suggestions are very welcome but right now I'm caught in a few constraints and a tight timeline.

2

2 Answers

1
votes

You need to create all non serializable objects in the executor. you can use foreachPartition or mapPartitions to create a connection in each executor.

Something similar to this (i'm using hbase client 2.0.0):

 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put, Result}
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}


df.foreachPartition(
partition => {
  //foreach executor create the connection and the table
  val config: Configuration = HBaseConfiguration.create()
  config.set("hbase.zookeeper.quorum", "zk url")
  val connection: Connection = ConnectionFactory.createConnection(config)
  val table = connection.getTable(TableName.valueOf("tableName"))
  partition.map(
    record => {
      val byteKey = Bytes.toBytes(record.getString(0))
      val get = new Get(byteKey)
      val result = table.get(get)
      //DO YOUR LOGIC HERE FOR EACH RECORD
    }
  ).toList
  table.close()
  connection.close()
}
)

df is the dataframe for each record you want to do the lookup.

You can create as many tables you need for each executor for the same connection.

As you create all the objects in executors you don't need to deal with non serializable problems. You can have it in a class like your HbaseUtilities to be used there but you need to create a new instance only inside the foreach/map partitions

0
votes

You can accomplish what you are trying to do by using the HBase-Spark Connector from the main branch of the HBase project. For some reason the connector doesn't seem to be included in any official HBase builds, but you can build it yourself and it works fine. Just build the jar and include it in your pom.xml.

Once built, the connector will allow you to pass the HBase Connection object inside the Worker class, so you don't have to worry about serializing the connection or building singletons/etc.

For example:

JavaSparkContext jSPContext ...; //Create Java Spark Context
HBaseConfiguration hbConf = HBaseConfiguration.create();
hbConf.set("hbase.zookeeper.quorum", zkQuorum);
hbConf.set("hbase.zookeeper.property.clientPort", PORT_NUM);
// this is your key link to HBase from Spark -- use it every time you need to access HBase inside the Spark parallelism:
JavaHBaseContext hBaseContext = new JavaHBaseContext(jSPContext, hbConf);   

// Create an RDD and parallelize it with HBase access:
JavaRDD<String> myRDD = ... //create your RDD
hBaseContext.foreachPartition(myRDD,  new SparkHBaseWorker());
// You can also do other usual Spark tasks, like mapPartitions, forEach, etc.

// The Spark worker class for foreachPartition use-case on RDD of type String would look something like this:
class SparkHBaseWorker implements VoidFunction<Tuple2<Iterator<String>, Connection>>
{
    private static final long serialVersionUID = 1L;
    
    public WorkerIngest()
    {
    }
    
// Put all your HBase logic into this function:
    @Override
    public void call(Tuple2<Iterator<String>, Connection> t) throws Exception
    {           
        // This is your HBase connection object:
        Connection conn = t._2();
        // Now you can do direct access to HBase from this Spark worker node:
        Table hbTable = conn.getTable(TableName.valueOf(MY_TABLE));
        // now do something with the table/etc.
    }
}