0
votes

I have a spark streaming application (using spark 1.6.1) that reads data from Kafka. I'm using a spark checkpoint directory on hdfs to recover from failures.

The code uses a Broadcast variable with a map at the start of each batch as follows

public static void execute(JavaPairDStream<String, MyEvent> events) {
  final Broadcast<Map<String, String>> appConfig =  MyStreamConsumer.ApplicationConfig.getInstance(new    JavaSparkContext(events.context().sparkContext()));

When I submit the job at run-time everything works fine and all events from kafka are processed correctly. The issue arises when recovering from failures (tested by rebooting the machine spark runs on) - the spark streaming application actually starts up correctly and everything looks fine in the UI with the job running, but as soon as data is sent through the below exception is encountered when this method is called (& the job crashes):

appConfig.value() (the broadcast variable from the start!)

Failed due to Spark job error

  Caused by: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to java.util.Map

If I kill the driver in the spark UI and resubmit the job from the command line, everything works fine again. But its a requirement of our product that it can auto recover from faults or even just reboots of any cluster node, so I have to fix the above. The issue is definitely related to the use of the Broadcast variable and loading the state from the spark checkpoint directory after the reboot

Also note, I do create the broadcast instance correctly (lazily/singleton):

public static Broadcast<Map<String, String>> getInstance(JavaSparkContext sparkContext) {
    if (instance == null) {

I do realize the issue seems to be related to: Is it possible to recover an broadcast value from Spark-streaming checkpoint

but I was not able to fix the issue following the instructions ther

1

1 Answers

0
votes

Answering my own question, as others might find it useful: Strange but the following seems to have fixed the issue, if I moved the following code to the start of the execute method: appConfig.value() and assigned it to a normal map variable there.

Then if I just use the use this map variable in my anonymous FlatMapFunction code instead of appConfig.value() - everything works fine even after a reboot.

Again, not sure why this works but it does...