Background
I am new to Spark streaming and fairly novice with scala and spark.
I have a java big data wrapper application that takes data input and generates libsvm and/or csv format data. This app is independent of spark.
The function I am developing allows the java app to open a socket, connect to a springboot app on a spark master node, instruct the app to open a spark stream, and then stream its data to spark.
Once the data is streamed, the java app shutsdown.
Most of this is working fine, but I am unable to shutdown the spark streaming context, so once the java side has shut down, I get non-stop
ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Connection Refused
There is an end of file signal that is read by the DStream. I have confirmed that it is received and parsed
Problem
However, despite having read the documentation, I am unable to find a way to shut down the StreamingContext programmatically. Indeed, I am reading online that StreamingContext.stop(true, true)
can lead to problems.
My code is below. Any help would be deeply appreciated.
(NOTE: logger.info("Stopping")
is never logged to file)
var:stop=false;
@throws(classOf[IKodaMLException])
def startStream(ip:String,port:Int):Unit=
{
try {
val ssc = getSparkStreamingContext(fieldVariables)
ssc.checkpoint("./ikoda/cp")
val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
lines.print
val lmap=lines.map
{
l =>
if(l.contains("IKODA_END_STREAM"))
{
stop=true;
}
.....do stuff and return processed line
}
if(stop)
{
logger.info("Stopping")
ssc.stop(true,true)
}
lmap.foreachRDD {
r =>
if(r.count() >0) {
.......do more stufff
}
else
{
logger.info("Empty RDD. No data received")
}
}
ssc.start()
ssc.awaitTermination()
}