0
votes

I define a receiver to read data from Redis.

part of receiver simplified code:

class MyReceiver extends Receiver (StorageLevel.MEMORY_ONLY){
  override def onStart() = {
    while(!isStopped) {
      val res = readMethod()
      if (res != null) store(res.toIterator) 
      // using res.foreach(r => store(r)) the performance is almost the same
    }
  }
}

My streaming workflow:

val ssc = new StreamingContext(spark.sparkContext, new Duration(50))
val myReceiver = new MyReceiver()
val s = ssc.receiverStream(myReceiver)
s.foreachRDD{ r => 
  r.persist()
  if (!r.isEmpty) {
    some short operations about 1s in total
    // note this line ######1
  }
}

I have a producer which produce much faster than consumer so that there are plenty records in Redis now, I tested with number 10000. I debugged, and all records could quickly be read after they are in Redis by readMethod() above. However, in each microbatch I can only get 30 records. (If store is fast enough it should get all of 10000)

With this suspect, I added a sleep 10 seconds code Thread.sleep(10000) to ######1 above. Each microbatch still gets about 30 records, and each microbatch process time increases 10 seconds. And if I increase the Duration to 200ms, val ssc = new StreamingContext(spark.sparkContext, new Duration(200)), it could get about 120 records.

All of these shows spark streaming only generate RDD in Duration? After gets RDD and in main workflow, store method is temporarily stopped? But this is a great waste if it is true. I want it also generates RDD (store) while the main workflow is running.

Any ideas?

1

1 Answers

0
votes

I cannot leave a comment simply I don't have enough reputation. Is it possible that propertyspark.streaming.receiver.maxRate is set somewhere in your code ?