0
votes

We are using a custom spark receiver that reads streamed data from a provided http link. If the provided http link is incorrect, the receiver fails. The problem is that spark will continuously restart the receiver, and the application will never terminate. The question is how to tell Spark to terminate the application if the receiver fails.

This is an extract of our custom receiver:

 def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Receiver") {
      override def run() { receive() }
    }.start()
  }

  private def receive(): Unit = {
    ....
    val response: CloseableHttpResponse = httpclient.execute(req)
    try {
      val sl = response.getStatusLine()
      if (sl.getStatusCode != 200){
        val errorMsg = "Error: " + sl.getStatusCode 
        val thrw = new RuntimeException(errorMsg)
        stop(errorMsg, thrw)
      } else {
      ...
        store(doc)
      }

We have a spark streaming application that uses this receiver:

val ssc = new StreamingContext(sparkConf, duration)
val changes = ssc.receiverStream(new CustomReceiver(...
...
ssc.start()
ssc.awaitTermination()

Everything works as expected if the receiver doesn't have errors. If the receiver fails (e.g. with a wrong http link), spark will continuously restart it and the application will never terminate.

16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.

We just want to terminate the whole application if a receiver fails.

2
sadly, now deprecatedJake

2 Answers

2
votes

There is a way to control the life cycle of Custom receiver based spark-streaming applications. Define job progress listener for your application and keep track of what is happening.

class CustomReceiverListener extends StreamingJobProgressListener {
    private boolean receiverStopped = false;

    public CustomReceiverListener(StreamingContext ssc) { super(ssc);}

    public boolean isReceiverStopped() {
        return receiverStopped;
    }
    @Override
    public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
        LOG.info("Update the flag field");
        this.receiverStopped = true;
    }
}

And in your driver, initialize a thread to monitor the state of receiverStopped flag. Driver will stop the stream app when this thread is finished. (Better approach is to define a callback method defined by the driver, that will stop the streaming application).

CustomReceiverListener listener = new CustomReceiverListener(ssc);
ssc.addStreamingListener(listener);
ssc.start();
Thread thread = new Thread(() -> {
    while (!listener.isReceiverStopped()) {
        LOG.info("Sleepy head...");
        try {
            Thread.sleep(2 * 1000); /*check after 2 seconds*/
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
thread.start();
thread.join();
LOG.info("Listener asked to die! Going to commit suicide :(");
ssc.stop(true, false);

Note: In case of multiple instances of your receivers, change the implementation of CustomReceiverListener to make sure all the receiver instances are stopped.

0
votes

It seems that the scheduling in Spark Streaming works in such a way that ReceiverTracker will keep restarting a failed receiver until ReceiverTracker is not stopped itself.

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L618

To stop ReceiverTracker, we need to stop the whole application. Thus, it seems there is no a way to control this process from a receiver itself.