2
votes

spark streaming UI Kafka direct consumer started to limit reads to 450 events(5 * 90 partitions) per batch (5 seconds), it was running fine for 1 or 2 days before that (about 5000 to 40000 events per batch)

I'm using spark standalone cluster (spark and spark-streaming-kafka version 1.6.1) running in AWS and using S3 bucket for checkpoint directory StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext), there are not scheduling delays and enough disk space on each worker node.

Didn't change any Kafka client initialization parameters, pretty sure that kafka's structure hasn't changed:

val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topics)

Also can't understand why when direct consumer description says The consumed offsets are by the stream itself I still need to use checkpoint directory when creating the streaming context?

1
Is spark.streaming.backpressure.enabled set to true? - Yuval Itzchakov
yes, I'll try to disable it - stanislav.chetvertkov
looks like it helped - stanislav.chetvertkov
That's good to know :) - Yuval Itzchakov

1 Answers

1
votes

This is usually the result of enabling backpressure via the setting spark.streaming.backpressure.enabled to true. Usually, when the backpressure algorithm sees that there's more data coming in then it's used to, it starts capping each batch to a rather small size until it can "stabilize" itself again. This sometimes has false positives and causes your stream to slow down the processing rate.

If you want to tweak the heuristic a little, there are some undocumented flags it is using (just make sure you know what you're doing):

val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)

If you want the gory details, PIDRateEstimator is what you're looking for.