0
votes

I used hbase-spark to record pv/uv in my spark-streaming project. Then when I killed the app and restart it, I got following exception while checkpoint-recover:

16/03/02 10:17:21 ERROR HBaseContext: Unable to getConfig from broadcast java.lang.ClassCastException: [B cannot be cast to org.apache.spark.SerializableWritable at com.paitao.xmlife.contrib.hbase.HBaseContext.getConf(HBaseContext.scala:645) at com.paitao.xmlife.contrib.hbase.HBaseContext.com$paitao$xmlife$contrib$hbase$HBaseContext$$hbaseForeachPartition(HBaseContext.scala:627) at com.paitao.xmlife.contrib.hbase.HBaseContext$$anonfun$com$paitao$xmlife$contrib$hbase$HBaseContext$$bulkMutation$1.apply(HBaseContext.scala:457) at com.paitao.xmlife.contrib.hbase.HBaseContext$$anonfun$com$paitao$xmlife$contrib$hbase$HBaseContext$$bulkMutation$1.apply(HBaseContext.scala:457) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

I checked the code of HBaseContext, It uses a broadcast to store the HBase configuration.

class HBaseContext(@transient sc: SparkContext,
               @transient config: Configuration,
               val tmpHdfsConfgFile: String = null) extends Serializable with Logging {

    @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
    @transient var tmpHdfsConfiguration: Configuration = config
    @transient var appliedCredentials = false
    @transient val job = Job.getInstance(config)

    TableMapReduceUtil.initCredentials(job)
    // <-- broadcast for HBaseConfiguration here !!!
    var broadcastedConf = sc.broadcast(new SerializableWritable(config))
    var credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials()))
    ...

When the checkpoint-recover, it tried to access this broadcast value in its getConf func:

if (tmpHdfsConfiguration == null) {
  try {
    tmpHdfsConfiguration = configBroadcast.value.value
  } catch {
    case ex: Exception => logError("Unable to getConfig from broadcast", ex)
  }
}

Then the exception raised. My question is: is it possible to recover the broadcasted value from checkpoint in a spark application? All we have some other solution to re-broadcast the value after recovering?

Thanks for any feedback!

2

2 Answers

2
votes

Currently, it's a known bug of Spark. Contributors have been investigating on this issue but made no progress.

Here's my workaround: Instead of loading data into broadcast variable and broadcasting to all executors, i let each executor loads the data itself into a singleton object.

Btw, follow this issue for changes https://issues.apache.org/jira/browse/SPARK-5206

2
votes

Follow below approach

  1. Create spark context.
  2. Initialize broadcast variable.
  3. Create streaming context with checkpoint directory using above spark context and passing on the initialized broadcast variable.

When streaming job starts with no data in checkpoint directory, it will initialize the broadcast variable.

When streaming restarts, it will recover the broadcast variable from checkpoint directory.