1
votes

I am using Spark Streaming (Spark V1.6.0) along with HBase in my project, and HBase(HBase V1.1.2) configurations are transferred among executors with broadcast variable. The Spark Streaming application works at first, while about 2 days later, exception will appear.

  val hBaseContext: HBaseContext = new HBaseContext(sc, HBaseCock.hBaseConfiguration())
  private def _materialDStream(dStream: DStream[(String, Int)], columnName: String, batchSize: Int) = {
    hBaseContext.streamBulkIncrement[(String, Int)](
      dStream,
      hTableName,
      (t) => {
        val rowKey = t._1
        val incVal = t._2
        val increment = new Increment(Bytes.toBytes(rowKey))
        increment.addColumn(Bytes.toBytes(hFamily), Bytes.toBytes(columnName), incVal)
        increment
      }, batchSize)
  }

Whole source file for HBaseContext could be found HBaseContext.scala, and some snippets could be found below.

And after days of running, exception will appear and the stack trace is:

Unable to getConfig from broadcast
16/02/01 10:08:10 ERROR Executor: Exception in task 3.0 in stage 187175.0 (TID 561527)

The logic is as follows:

  1. Create HBaseContext with config (HBaseContext) and broadcast the config (save the config to file if file path is specified)
  2. Before connecting HBase, first it will check if the field config is null, and if so, restore it from the specified file, or if no file path is specified, restore it from the broadcast variable.

The problem happens when restoring configuration from broadcast variables, and exception happens reading value from broadcast, in "configBroadcast.value.value".

I guess if Spark Streaming won't restore the broadcast variables if master failed, while the getOrCreate() is used to get a SparkStreaming instance. I am more curious about in HBaseContext.scala source code, that file is preferred to broadcast variable to restore values. So what is the best practise using broadcast in Spark Streaming? Do I need to store them in files, say files in HDFS?

class HBaseContext(@transient sc: SparkContext, @transient config: Configuration, val tmpHdfsConfgFile: String = null) extends Serializable{
    @transient var tmpHdfsConfiguration: Configuration = config

    val broadcastedConf = sc.broadcast(new SerializableWritable(config))

    if(tmpHdfsConfgFile != null && config != null){
      // save config to file
    }

    private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): Configuration = {

      if (tmpHdfsConfiguration != null) {
        tmpHdfsConfiguration
      } else if (tmpHdfsConfgFile != null) {
        // read config from file

        tmpHdfsConfiguration
      }
      if (tmpHdfsConfiguration == null) {
        try {
          // Exception happens here!!!
          tmpHdfsConfiguration = configBroadcast.value.value
          tmpHdfsConfiguration
        } catch {
          case ex: Exception => {
            println("Unable to getConfig from broadcast")
          }
        }
      }
    tmpHdfsConfiguration
  }
}
1

1 Answers

1
votes

Broadcast variables are reset after spark job is restarted for some reason. Or Driver is re-associated with attempt after job failure.

In case of streaming job, to use broadcast variable one should initialize the broadcast from sprarkContext just before creating StreamingContext. This will ensure the broadcast variables are available when the streaming starts.

JavaSparkContext javaSparkContext = createSparkContext();

Broadcast<BroadcastContext> broadcastContext = getBroadcastContext(javaSparkContext);

JavaStreamingContext javaStreamingContext = JavaStreamingContext.getOrCreate(sparkCheckPointDir,
                () -> processor.create(sparkCheckPointDir, javaSparkContext));