2
votes

I have Spark 1.4 Streaming application, which reads data from Kafka, uses statefull transformation, and has batch interval of 15 seconds.

In order to use statefull transformations, as well as recover from driver failures, I need to set checkpointing on streaming context.

Also, in the Spark 1.4 documentation they recommend DStream checkpointing 5-10 times of the batch interval.

So my questions is:

What happens if I only set checkpointing on spark streaming context? I guess DStreams will be checkpointed every batch interval?

What If I set both checkpointing on streaming context as well as the moment I read data from Kafka, I set:

DStream.checkpoint(90 seconds)

What will be the intervals for metadata checkpointing and what for data checkpointing (meaning DStreams)?

Thank you.

1

1 Answers

2
votes

I guess DStreams will be checkpointed every batch interval?

No, Spark will checkpoint your data every batch interval multiplied by a constant. This means that if your batch interval is 15 seconds, data will be checkpointed every multiple of 15 seconds. In mapWithState, for example, which is a stateful stream, you can see the batch interval is multiplied by 10:

private[streaming] object InternalMapWithStateDStream {
  private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}

What will be the intervals for metadata checkpointing and what for data checkpointing (meaning DStreams)?

If you set the checkpoint duration to 90 seconds on the DStream, then that'll be your checkpoint duration, meaning every 90 seconds the data will get checkpointed. You cannot set the checkpoint duration directly on the StreamingContext, all you can do with it is pass the checkpoint directory. The overload of checkpoint only takes a String:

/**
 * Set the context to periodically checkpoint the DStream operations for driver
 * fault-tolerance.
 * @param directory HDFS-compatible directory where the checkpoint
 *        data will be reliably stored.
 *        Note that this must be a fault-tolerant file system like HDFS.
 */
def checkpoint(directory: String)

Edit

For updateStateByKey, it seems that the time for checkpointing is set to be the batch time multiplied by Seconds(10) / slideDuration:

// Set the checkpoint interval to be slideDuration or 10 seconds,
// which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
  checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
  logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
}