13
votes

In Spark Streaming it is possible (and mandatory if you're going to use stateful operations) to set the StreamingContext to perform checkpoints into a reliable data storage (S3, HDFS, ...) of (AND):

  • Metadata
  • DStream lineage

As described here, to set the output data storage you need to call yourSparkStreamingCtx.checkpoint(datastoreURL)

On the other hand, it is possible to set lineage checkpoint intervals for each DataStream by just calling checkpoint(timeInterval) at them. In fact, it is recommended to set lineage checkpoint interval between 5 and 10 times the DataStream's sliding interval:

dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.

My question is:

When the streaming context has been set to perform checkpointing and no ds.checkpoint(interval) is called, is lineage checkpointing enabled for all data streams with a default checkpointInterval equal to batchInterval? Or is, on the contrary, only metadata checkpointing what is enabled?

1
How can you say that in Streaming by Default checkpointing is enabled? I know that it remembers the data and its lineage but by default I do not see any checkpointing being enabled...which means if your driver fails or some node went down then you may loose the data residing in that node provided there is no replication (using "_2" of StorageLevel.) being done.Sumit
@Sumit I didn't say that at all. What I asked is whether once you enable checkpoint by calling strmCtx.checkpoint("hdfs://...") it also enables all data streams checkpoints with a update interval equal to the context batch interval.Pablo Francisco Pérez Hidalgo
Checkpoint is enabled for the whole Streaming Context, so all DStreams created from the same Context will enjoy the benefits on checkpoint.Sumit
@Sumit Not all, as described in my self-answer, just two kinds of DStreams enable checkpoint when the StreamingContext has been set to perform checkpoints.Pablo Francisco Pérez Hidalgo

1 Answers

11
votes

Checking Spark code (v1.5) I found that DStreams' checkpoint are enabled under two circumstances:

By an explicit call to their checkpoint method (not StreamContext's):

/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
    if (isInitialized) {
        throw new UnsupportedOperationException(
            "Cannot change checkpoint interval of an DStream after streaming context has started")
    }
    persist()
    checkpointDuration = interval
    this
}

At the DStream initialization as long as the concrete 'DStream' subclass has overridden mustCheckpoint attribute (setting it to true):

 private[streaming] def initialize(time: Time) {
  ...
  ...   
   // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
   if (mustCheckpoint && checkpointDuration == null) {
     checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
     logInfo("Checkpoint interval automatically set to " + checkpointDuration)
   }
  ...

The first case is obvious. Performing a naive analysis on the Spark Streaming code:

grep "val mustCheckpoint = true" $(find -type f -name "*.scala")

> ./org/apache/spark/streaming/api/python/PythonDStream.scala:  override     val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala:  override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/StateDStream.scala:  override val mustCheckpoint = true

I can find that, in general (ignoring PythonDStream), StreamingContext checkpoint only enables lineage checkpoints for StateDStream and ReducedWindowedDStream instances. These instances are the result of the transformations (respectively, AND):

  • updateStateByKey: That is, the stream providing an state through several windows.
  • reduceByKeyAndWindow