I have my spark streaming pipeline integrated with kafka ,I have also configured the check pointing ,To test resiliency i killed the job manually and then restarted it then i am getting below exception
Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@1d304ac has not been initialized
at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:267)
Code i used
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
public JavaStreamingContext create() {
final SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
sparkConf.setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); // new context
jssc.checkpoint("D:\\Checkpoint");
return jssc;
}
};
// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate("D:\\Checkpoint", contextFactory);
int numThreads = Integer.parseInt(1+"");
Kindly suggest what i am doing wrong