4
votes

We have a long running Spark Structured Streaming query which is reading from Kafka, and we would like this query to pick up where it left off after the restart. However, we have set the startingOffsets to "earliest" and what we see after restarts is that the query reads from the start of the Kafka topic again.

Our basic query looks like this:

  val extract = sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "server:port")
    .option("subscribe", "topic")
    .option("startingOffsets", "earliest")
    .load()

  val query: StreamingQuery = extract 
    .writeStream
    .option("checkpointLocation", s"/tmp/checkpoint/kafka/")
    .foreach(writer)
    .start()

We see the checkpoint directory getting created correctly and with the offsets we expect in the offset files.

When we restart we see a message like:

25-07-2017 14:35:32 INFO  ConsumerCoordinator:231 - Setting newly assigned partitions [KafkaTopic-2, KafkaTopic-1, KafkaTopic-0, KafkaTopic-3] for group spark-kafka-source-dedc01fb-c0a7-40ea-8358-a5081b961968--1396947302-driver

We are telling the query to start at "earliest" but the documentation says:

This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off.

Shouldn't this mean that restarting our application results in the query resuming where it left off?

Setting the "group.id" for Kafka is not allowed with Spark Structured Streaming. See this: Note that the following Kafka params cannot be set and the Kafka source will throw an exception.

I tried adding queryName, in case that was being used to identify the query across runs but it did not have any effect.

We are using Spark 2.1 on YARN.

Any ideas on why this does not work or what we are doing wrong?

UPDATE WITH LOGS:

From the Driver

From the Worker

1
The log means nothing. Spark will create a unique id as the group id for each run. You can use StreamingQuery.recentProgress to print the recent progress and it should contain the offset information. - zsxwing
That doesn't help much. It runs fine. It just starts at the first offset instead of the last read when restarted. - Patrick McGloin
The previous run just has one batch? If so, before 2.2.0, Spark will always rerun the last batch in the previous run. - zsxwing
It seems to me that Spark doesn't pick up the checkpoint dir at all when it restarts. I see no diff in 1) removing the checkpoint dir and running the application and 2) stopping the application after it has processed all messages and then restarting with a checkpoint dir that looks good. In both cases it starts at the earliest Kafka message. I assume I am doing something wrong but can't figure out what it is. - Patrick McGloin
Do you mind sharing the logs? - zsxwing

1 Answers

0
votes

First of all, why are you saying that the checkpoint directory gets created again. Are you deleting it after your initial run and then resuming it?

So, just to be clear ".option("startingOffsets", "earliest")" setting will read from the very beginning when you start the query for the first time. And consider that something went wrong, and the stream stopped. You fix it and start the stream once again (without deleting the checkpoint directory) the stream should start from the offset where it had stopped previously.

In case if you have deleted the checkpoint directory and then resumed the stream obviously it won't be having any history of offsets read (as you have deleted the checkpoint) and thus will start from the very first (earliest) offset that is available on kafka.