1
votes

When I start my Spark Structured Streaming 3.0.1 application from the latest offset it works well. But when I want to start from some earlier offsets - for example:

  • startingOffsets to "earliest"
  • startingOffsets to particular offset like {"MyTopic-v1":{"0":1686734237}}

I can see in the logs that the starting offset gets picked up correctly, but then a series of seeks is happening (starting from my defined position) until it reaches the current latest offset.

I dropped my checkpoint directory and tried several options but the scenario is always the same - it reports correct starting offset, but then takes a very long time just to slowly seek to the most recent and start processing - any idea why and what I should additionally check?

2021-02-19 14:52:23 INFO  KafkaConsumer:1564 - [...] Seeking to offset 1786734237 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO  KafkaConsumer:1564 - [...] Seeking to offset 1786734737 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO  KafkaConsumer:1564 - [...] Seeking to offset 1786735237 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO  KafkaConsumer:1564 - [...] Seeking to offset 1786735737 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO  KafkaConsumer:1564 - [...] Seeking to offset 1786736237 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO  KafkaConsumer:1564 - [...] Seeking to offset 1786736737 for partition MyTopic-v1-0
2021-02-19 14:52:23 INFO  KafkaConsumer:1564 - [...] Seeking to offset 1786737237 for partition MyTopic-v1-0

I left the application running for longer time and it started producing the files eventually, but my processing trigger of 100 seconds was not met, the data showed up much later - after 20-30minutes.

(I tested it also on spark 2.4.5 - the same problem - maybe it's some kafka configuration?)

1
Could it be that while "takes a very long time just to slowly seek to the most recent" it is actually processing the data? Otherwise, there is nothing wrong with using the startingOffsets as you did. Maybe share a minimal reproducible example.mike
this is actually possible - I left the application running for longer time and it started producing the files eventually, but my processing trigger of 100 seconds was not met, the data showed up much later - after 20-30minutesPiotr Reszke
Thank you @mike! The documentation is not so clear about this parameter - I guess this could be corrected as maxOffsetsPerTrigger is described more like a "rate limit"Piotr Reszke
@mike please post your answer here, I'll mark it as correctPiotr Reszke

1 Answers

1
votes

Using the option startingOffsets with a JSON object as you showed should work perfectly fine.

What you have observed, is that on the first start of the application the Structured Streaming job will read all(!) offsets from the provided (1686734237) until the last available offset in the topic. As this can be quite a high number of messages the processing of that big chunk will keep the first micro-batch quite busy.

Remember that the Trigger option just defines the triggering frequency of the micro-batch. You should make sure to align this Trigger rate with the expected processing time. I see basically two options here:

  • use option maxOffsetsPerTriger to limit the amount of offsets fetched from Kafka per Trigger / Micro-Batch
  • Avoid using any Trigger as this will allow your stream, by default, to trigger as soon as the previous trigger has finished processing the data