10
votes

We have a Spark Streaming Application running on Spark 2.3.3

Basically, it opens a Kafka Stream:

  kafka_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "mykafka:9092") \
  .option("subscribe", "mytopic") \
  .load()

The kafka topic has 2 partitions. After that, there are some basic filtering operations, some Python UDFs and an explode() on a column, like:

   stream = apply_operations(kafka_stream)

where apply_operations does all the work on the data. In the end, we would like to write the stream to a sink, i. e.:

   stream.writeStream \
   .format("our.java.sink.Class") \
   .option("some-option", "value") \
   .trigger(processingTime='15 seconds') \
   .start()

To let this stream operation run forever, we apply:

   spark.streams.awaitAnyTermination()

In the end.

So far, so good. Everything runs for days. But due to a network problem, the job died for a few days, and there are now millions of messages in the kafka stream waiting to be catched up.

When we restart the streaming data job using spark-submit, the first batch will be too large and will take ages to be completed. We thought there might be a way to limit the size of the first batch with some parameter, but we did not find anything that helped.

We tried:

  • spark.streaming.backpressure.enabled=true along with spark.streaming.backpressure.initialRate=2000 and spark.streaming.kafka.maxRatePerPartition=1000 and spark.streaming.receiver.maxrate=2000

  • setting spark.streaming.backpressure.pid.minrate to a lower value did not had an effect either

  • setting the option("maxOffsetsPerTrigger", 10000) did not had an effect as well

Now, after we restart the pipeline, sooner or later the whole Spark Job will crash again. We cannot simply widen the memory or the cores to be used for the spark job.

Is there anything we missed to control the amount of events beeing processed in one stream-batch?

1
Have you tried batch processing? - patilnitin
You mean, rewrite the streaming application to be a "normal" batch application? My hope was that one can avoid a rewrite and just tune some parameters? - Regenschein
backpressure.initialRate doesn't work with structured streaming but maxOffsetsPerTrigger should work. How many messages does he gather for the first batch? - cronoik
@cronoik when I looked into the jobs tab of the Spark UI, there were millions of messages collected, and still millions of records to be written to the sink. Usually, when the stream is up and running, it processes around 2000 messages per trigger. But when the stream is behind, it seems to be taking all messages available, regardless of any of the tried options. - Regenschein
Just to be sure, you set the maxOffsetsPerTrigger for the readstream? Can you please try the following in a pysparkshell: kafka_stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "mykafka:9092").option("subscribe", "mytopic").option('maxOffsetsPerTrigger', 4).load() and query = df.writeStream.queryName("bla").outputMode("append").format("console").start() How many rows do you get? - cronoik

1 Answers

5
votes

You wrote in the comments that you are using spark-streaming-kafka-0-8_2.11 and that api version is not able to handle maxOffsetPerTrigger (or any other mechanism to reduce the number of consumed messages as far as I know) as it was only implemented for the newer api spark-streaming-kafka-0-10_2.11. This newer api also works with your kafka version 0.10.2.2 according to the documentation.