2
votes

We are trying to build a fault tolerant spark streaming job, there's one problem we are running into. Here's our scenario:

    1) Start a spark streaming process that runs batches of 2 mins
    2) We have checkpoint enabled. Also the streaming context is configured to either create a new context or build from checkpoint if one exists
    3) After a particular batch completes, the spark streaming job is manually killed using yarn application -kill (basically mimicking a sudden failure) 
    4) The spark streaming job is then restarted from checkpoint

The issue that we are having is that after the spark streaming job is restarted it replays the last successful batch. It always does this, just the last successful batch is replayed, not the earlier batches

The side effect of this is that the data part of that batch is duplicated. We even tried waiting for more than a minute after the last successful batch before killing the process (just in case writing to checkpoint takes time), that didn't help

Any insights? I have not added the code here, hoping someone has faced this as well and can give some ideas or insights. Can post the relevant code as well if that helps. Shouldn't spark streaming checkpoint right after a successful batch so that it is not replayed after a restart? Does it matter where I place the ssc.checkpoint command?

3

3 Answers

2
votes

You have the answer in the last line of your question. The placement of ssc.checkpoint() matters. When you restart the job using the saved checkpointing, the job comes up with whatever is being saved. So in your case when you killed the job after the batch is completed, the recent one is the last successful one. By this time, you might have understood that checkpointing is mainly to pick up from where you left off-especially for failed jobs.

0
votes

There are two things those need to be taken care.

1] Ensure that the same checkpoint directory is being used in getOrCreate streaming context method when you restart the program.

2] Set “spark.streaming.stopGracefullyOnShutdown" to "true". This allows spark to complete processing current data and update the checkpoint directory accordingly. If set to false, it may lead to corrupt data in checkpoint directory.

Note: Please post code snippets if possible. And yes, the placement of ssc.checkpoint does matter.

0
votes

In Such a scenario, one should ensure that checkpoint directory used in streaming context method is same after restart of Spark application. Hopefully it will help