0
votes

I have my spark streaming pipeline integrated with kafka ,I have also configured the check pointing ,To test resiliency i killed the job manually and then restarted it then i am getting below exception

Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@1d304ac has not been initialized
    at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:267)

Code i used

JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {


      public JavaStreamingContext create() {
          final SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
            sparkConf.setMaster("local[2]");

        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));  // new context

        jssc.checkpoint("D:\\Checkpoint");
        return jssc;
      }
    };

    // Get JavaStreamingContext from checkpoint data or create a new one
    JavaStreamingContext jssc = JavaStreamingContext.getOrCreate("D:\\Checkpoint", contextFactory);
    int numThreads = Integer.parseInt(1+"");

Kindly suggest what i am doing wrong

2

2 Answers

1
votes
private static JavaStreamingContext createContext(){
    final SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
    sparkConf.setMaster("local[2]");

    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));  // new context

    jssc.checkpoint("D:\\Checkpoint");
    return jssc;
}

Function0<JavaStreamingContext> createContextFunc = new Function0<JavaStreamingContext>() {
    @Override
    public JavaStreamingContext call() {
        return createContext();
    }
};

JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate("D:\\Checkpoint", createContextFunc);

Use Function0, this works for me.

0
votes

http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

You didn't show your code for creation of your DStreams/transformations. Put those inside of the create() method.