I define a receiver to read data from Redis.
part of receiver simplified code:
class MyReceiver extends Receiver (StorageLevel.MEMORY_ONLY){
override def onStart() = {
while(!isStopped) {
val res = readMethod()
if (res != null) store(res.toIterator)
// using res.foreach(r => store(r)) the performance is almost the same
}
}
}
My streaming workflow:
val ssc = new StreamingContext(spark.sparkContext, new Duration(50))
val myReceiver = new MyReceiver()
val s = ssc.receiverStream(myReceiver)
s.foreachRDD{ r =>
r.persist()
if (!r.isEmpty) {
some short operations about 1s in total
// note this line ######1
}
}
I have a producer which produce much faster than consumer so that there are plenty records in Redis now, I tested with number 10000. I debugged, and all records could quickly be read after they are in Redis by readMethod()
above. However, in each microbatch I can only get 30 records. (If store is fast enough it should get all of 10000)
With this suspect, I added a sleep 10 seconds code Thread.sleep(10000)
to ######1
above. Each microbatch still gets about 30 records, and each microbatch process time increases 10 seconds. And if I increase the Duration to 200ms, val ssc = new StreamingContext(spark.sparkContext, new Duration(200))
, it could get about 120 records.
All of these shows spark streaming only generate RDD in Duration
? After gets RDD and in main workflow, store
method is temporarily stopped? But this is a great waste if it is true. I want it also generates RDD (store) while the main workflow is running.
Any ideas?