We have a long running Spark Structured Streaming query which is reading from Kafka, and we would like this query to pick up where it left off after the restart. However, we have set the startingOffsets to "earliest" and what we see after restarts is that the query reads from the start of the Kafka topic again.
Our basic query looks like this:
val extract = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server:port")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.load()
val query: StreamingQuery = extract
.writeStream
.option("checkpointLocation", s"/tmp/checkpoint/kafka/")
.foreach(writer)
.start()
We see the checkpoint directory getting created correctly and with the offsets we expect in the offset files.
When we restart we see a message like:
25-07-2017 14:35:32 INFO ConsumerCoordinator:231 - Setting newly assigned partitions [KafkaTopic-2, KafkaTopic-1, KafkaTopic-0, KafkaTopic-3] for group spark-kafka-source-dedc01fb-c0a7-40ea-8358-a5081b961968--1396947302-driver
We are telling the query to start at "earliest" but the documentation says:
Shouldn't this mean that restarting our application results in the query resuming where it left off?
Setting the "group.id" for Kafka is not allowed with Spark Structured Streaming. See this: Note that the following Kafka params cannot be set and the Kafka source will throw an exception.
I tried adding queryName, in case that was being used to identify the query across runs but it did not have any effect.
We are using Spark 2.1 on YARN.
Any ideas on why this does not work or what we are doing wrong?
UPDATE WITH LOGS:
StreamingQuery.recentProgressto print the recent progress and it should contain the offset information. - zsxwing