2
votes

To have a simple way of testing the Spark Streaming Write Ahead Log I created a very simple Custom Input Receiver, which will generate strings and store those:

class InMemoryStringReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {

  val batchID = System.currentTimeMillis()

  def onStart() {
    new Thread("InMemoryStringReceiver") {
      override def run(): Unit = {
        var i = 0
        while(true) {
          //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
          //To implement a reliable receiver, you have to use store(multiple-records) to store data.
          store(ArrayBuffer(s"$batchID-$i"))
          println(s"Stored => [$batchID-$i)]")
          Thread.sleep(1000L)
          i = i + 1
        }
      }
    }.start()
  }

  def onStop() {}
}

I then created a simple Application which will use the Custom Receiver to stream the data and process it:

object DStreamResilienceTest extends App {

  val conf = new SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable", "true")
  val ssc = new StreamingContext(conf, Seconds(1))
  ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
  val customReceiverStream: ReceiverInputDStream[String] = ssc.receiverStream(new InMemoryStringReceiver())
  customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
    println(s"processed => [${rdd.collect().toList}]")
    Thread.sleep(2000L)
  }
  ssc.start()
  ssc.awaitTermination()

}

As you can see the processing of each received RDD has sleep of 2 seconds while the Strings are stored every second. This creates a backlog and the new strings pile up, and should be stored in the WAL. Indeed, I can see the files in the checkpoint dirs getting updated. Running the app I get output like this:

[info] Stored => [1453374654941-0)]
[info] processed => [List(1453374654941-0)]
[info] Stored => [1453374654941-1)]
[info] Stored => [1453374654941-2)]
[info] processed => [List(1453374654941-1)]
[info] Stored => [1453374654941-3)]
[info] Stored => [1453374654941-4)]
[info] processed => [List(1453374654941-2)]
[info] Stored => [1453374654941-5)]
[info] Stored => [1453374654941-6)]
[info] processed => [List(1453374654941-3)]
[info] Stored => [1453374654941-7)]
[info] Stored => [1453374654941-8)]
[info] processed => [List(1453374654941-4)]
[info] Stored => [1453374654941-9)]
[info] Stored => [1453374654941-10)]

As you would expect, the storing is out pacing the processing. So I kill the application and restart it. This time I commented out the sleep in the foreachRDD so that the processing can clear any backlog:

[info] Stored => [1453374753946-0)]
[info] processed => [List(1453374753946-0)]
[info] Stored => [1453374753946-1)]
[info] processed => [List(1453374753946-1)]
[info] Stored => [1453374753946-2)]
[info] processed => [List(1453374753946-2)]
[info] Stored => [1453374753946-3)]
[info] processed => [List(1453374753946-3)]
[info] Stored => [1453374753946-4)]
[info] processed => [List(1453374753946-4)]

As you can see the new events are processed but none from the previous batch. The old WAL logs are cleared and I see log messages like this but the old data does not get processed.

INFO WriteAheadLogManager : Recovered 1 write ahead log files from hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0

What am I doing wrong? I am using Spark 1.5.2.

1

1 Answers

2
votes

This was answered by Shixiong(Ryan) Zhu on the Spark Users mailing list.

Using StreamingContext.getOrCreate as he suggested works.