0
votes

I am using Spark Structured Streaming to read from a Kafka topic (say topic1) and using SINK to write to another topic (topic1-result). I can see the messages are not being removed from Topic1 after writing to another topic using Sink.

// Subscribe to 1 topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1")
  .option("subscribe", "topic1")
  .load()

//SINK to another topic 
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1")
  .option("checkpointLocation", "/tmp/checkpoint1")
  .option("topic", "topic1-result")
  .start()

the documentation says we can not use auto-commit for structured streams

enable.auto.commit: Kafka source doesn’t commit any offset.

but how to acknowledge messages and remove the processed messages from the topic (topic1)

1
You should not use /tmp as a checkpoint location. Ideally, you'd use S3 or HDFS or similar - OneCricketeer

1 Answers

2
votes

Two considerations:

  1. Messages are not removed from Kafka once you have committed. When your consumer executes commit, Kafka increases the offset of this topic respect to the consumer-group that has been created. But messages remain in the topic depending on the retention time that you configure for the topic.

  2. Indeed, Kafka source doesn´t make the commit, the stream storages the offset that points to the next message in the streaming´s checkpoint dir. So when you stream restarts it takes the last offset to consume from it.