Working with Spark Structured Streaming.
I am working on a code where I need to do a lot of lookups on data. Lookups are very complex and just don't translate too well to joins.
e.g. look up field A in Table B and fetch a value, if found lookup that values in another table. if not found lookup some other value C in table D and then so on and so forth.
I managed to write these lookups using HBase and it works fine, functionally. I wrote udfs for each of these lookups e.g. a very simple one might be:
val someColFunc= udf( (code:String) =>
{
val value = HbaseObject.table.getRow("lookupTable", code, "cf", "value1")
if (value != null)
Bytes.toString(value)
else
null
}
)
Since java hbase client is non serializable. I am passing Hbase object like this
object HbaseObject {
val table = new HbaseUtilities(zkUrl)
}
HbaseUtilities is a class I wrote to simplify lookups. It just creates a java HBase client and provides a wrapper for the kind of get commands I need.
This is rendering my code too slow, which too, is alright. What's puzzling me, is that increasing or decreasing the number of executors or cores is having no effect on the speed of my code. be it 1 executor or 30 it's running at the exact same rate. Which makes me believe there is lack of parallelism. So all my workers must be sharing the same Hbase object. Is their a way I can instantiate one such object on each worker before they start executing? I have already tried using lazy val, it's not having any effect
I have even tried creating a sharedSingleton as shown here https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/, which solved some problems for me but not the loss of parallelism.
I know there might be better ways to solve the problem and all suggestions are very welcome but right now I'm caught in a few constraints and a tight timeline.