I guess DStreams will be checkpointed every batch interval?
No, Spark will checkpoint your data every batch interval multiplied by a constant. This means that if your batch interval is 15 seconds, data will be checkpointed every multiple of 15 seconds. In mapWithState
, for example, which is a stateful stream, you can see the batch interval is multiplied by 10:
private[streaming] object InternalMapWithStateDStream {
private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}
What will be the intervals for metadata checkpointing and what for
data checkpointing (meaning DStreams)?
If you set the checkpoint duration to 90 seconds on the DStream
, then that'll be your checkpoint duration, meaning every 90 seconds the data will get checkpointed. You cannot set the checkpoint duration directly on the StreamingContext
, all you can do with it is pass the checkpoint directory. The overload of checkpoint
only takes a String
:
/**
* Set the context to periodically checkpoint the DStream operations for driver
* fault-tolerance.
* @param directory HDFS-compatible directory where the checkpoint
* data will be reliably stored.
* Note that this must be a fault-tolerant file system like HDFS.
*/
def checkpoint(directory: String)
Edit
For updateStateByKey
, it seems that the time for checkpointing is set to be the batch time multiplied by Seconds(10) / slideDuration
:
// Set the checkpoint interval to be slideDuration or 10 seconds,
// which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
}