I have a spark streaming application (using spark 1.6.1) that reads data from Kafka. I'm using a spark checkpoint directory on hdfs to recover from failures.
The code uses a Broadcast variable with a map at the start of each batch as follows
public static void execute(JavaPairDStream<String, MyEvent> events) {
final Broadcast<Map<String, String>> appConfig = MyStreamConsumer.ApplicationConfig.getInstance(new JavaSparkContext(events.context().sparkContext()));
When I submit the job at run-time everything works fine and all events from kafka are processed correctly. The issue arises when recovering from failures (tested by rebooting the machine spark runs on) - the spark streaming application actually starts up correctly and everything looks fine in the UI with the job running, but as soon as data is sent through the below exception is encountered when this method is called (& the job crashes):
appConfig.value() (the broadcast variable from the start!)
Failed due to Spark job error
Caused by: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to java.util.Map
If I kill the driver in the spark UI and resubmit the job from the command line, everything works fine again. But its a requirement of our product that it can auto recover from faults or even just reboots of any cluster node, so I have to fix the above. The issue is definitely related to the use of the Broadcast variable and loading the state from the spark checkpoint directory after the reboot
Also note, I do create the broadcast instance correctly (lazily/singleton):
public static Broadcast<Map<String, String>> getInstance(JavaSparkContext sparkContext) {
if (instance == null) {
I do realize the issue seems to be related to: Is it possible to recover an broadcast value from Spark-streaming checkpoint
but I was not able to fix the issue following the instructions ther