0
votes

Background

I am new to Spark streaming and fairly novice with scala and spark.

  • I have a java big data wrapper application that takes data input and generates libsvm and/or csv format data. This app is independent of spark.

  • The function I am developing allows the java app to open a socket, connect to a springboot app on a spark master node, instruct the app to open a spark stream, and then stream its data to spark.

  • Once the data is streamed, the java app shutsdown.

  • Most of this is working fine, but I am unable to shutdown the spark streaming context, so once the java side has shut down, I get non-stop

ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Connection Refused

There is an end of file signal that is read by the DStream. I have confirmed that it is received and parsed

Problem

However, despite having read the documentation, I am unable to find a way to shut down the StreamingContext programmatically. Indeed, I am reading online that StreamingContext.stop(true, true) can lead to problems.

My code is below. Any help would be deeply appreciated.

(NOTE: logger.info("Stopping") is never logged to file)

var:stop=false;

@throws(classOf[IKodaMLException])
def  startStream(ip:String,port:Int):Unit=
{
 try {
  val ssc = getSparkStreamingContext(fieldVariables)
  ssc.checkpoint("./ikoda/cp")

  val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
  lines.print

  val lmap=lines.map
  {
    l =>
      if(l.contains("IKODA_END_STREAM"))
        {
          stop=true;
          
        }

      .....do stuff and return processed line
  }

 if(stop)
    {
      logger.info("Stopping")
      ssc.stop(true,true)
    }

  
    lmap.foreachRDD {
      r =>
        if(r.count() >0) {
          .......do more stufff
        }
        else
          {
            logger.info("Empty RDD. No data received")
          }
    }
  ssc.start()
  ssc.awaitTermination()
}
1

1 Answers

0
votes

UPDATE: This answer seems to be factually correct, but conceptually wrong. I think a better answer is provided in this post


This answer is a bit preliminary. It answers the question but does not solve the problem. Other input is most welcome.

  • First, the documentation states that shutting down programmatically is ok. However, I have noticed one or two connection related exceptions are thrown before shutdown. However, even when SparkContext is told to shutdown along with the stream, it does not seem to do so. So shutting down programmatically seems unwise. Unless I can restart the stream, the project is moot.

  • Second, the only code that is applied to StreamingContext during streaming is code directly referencing the DSTream, so obviously the stop() call in the above code (in question above) was wrong.

  • Third, streaming (as I understand) does occur on the driver. So field variables can be created, and referenced inside DStream loops, maps etc.

  • It is possible to create a Thread that monitors for a shutdown call as field level boolean. Then call Streaming Context and shutdown.

The Thread

var stopScc=false

private def stopSccThread(): Unit = {
val thread = new Thread {
  override def run {

    var continueRun=true
    while (continueRun) {
      logger.debug("Checking status")
      if (stopScc == true) {
        getSparkStreamingContext(fieldVariables).stop(true, true)
        logger.info("Called Stop on Streaming Context")
        continueRun=false


      }
      Thread.sleep(50)
    }
  }
}
thread.start

}

The Stream

@throws(classOf[IKodaMLException])
def startStream(ip: String, port: Int): Unit = {

try {
  val ssc = getSparkStreamingContext(fieldVariables)
  ssc.checkpoint("./ikoda/cp")

  val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
  lines.print


  val lmap = lines.map {
    l =>

      if (l.contains("IKODA_END_STREAM")) {
        stopScc = true
      }
      l
  }


  lmap.foreachRDD {
    r =>
      if (r.count() > 0) {
        logger.info(s"RECEIVED: ${r.toString()} first: ${r.first().toString}")
        r.saveAsTextFile("./ikoda/test/test")
      }
      else {
        logger.info("Empty RDD. No data received")
      }
  }
  ssc.start()

  ssc.awaitTermination()
}
catch {
  case e: Exception =>
    logger.error(e.getMessage, e)
    throw new IKodaMLException(e.getMessage, e)
}