I would like to build a Spark streaming pipeline that reads from multiple Kafka Topics (that vary in number over time). I intended on stopping the the streaming job, adding/removing the new topics, and starting the job again whenever I required an update to the topics in the streaming job using one of the two options outlined in the Spark Structured Streaming + Kafka Integration Guide:
# Subscribe to multiple topics
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Upon further investigation, I noticed the following point in the Spark Structured Streaming Programming Guide and am trying to understand why changing the number of input sources is "not allowed":
Changes in the number or type (i.e. different source) of input sources: This is not allowed.
Definition of "Not Allowed" (also from Spark Structured Streaming Programming Guide):
The term not allowed means you should not do the specified change as the restarted query is likely to fail with unpredictable errors. sdf represents a streaming DataFrame/Dataset generated with sparkSession.readStream.
My understanding is that Spark Structured Streaming implements its own checkpointing mechanism:
In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the quick example) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when starting a query.
Can someone please explain why changing the number of sources is "not allowed"? I assume that would be one of the benefits of the checkpointing mechanism.