I have built an application using the Apache Kafka and Apache Spark Structured streaming. I am facing the below issue.
Scenario:
- I set up a Spark structured stream with a source of Kafka topic and sink as Kafka topic.
- We run the stream and produce a number of messages on the Kafka topic.
- We stopped the stream and restart stream by clearing checkpointing location of the stream. After running for 5 to 6 hour later stream is consuming old Kafka messages randomly.
After clearing checkpointing location I was expecting only new messages on stream.
Spark version: 2.4.0,
Kafka-client version: 2.0.0,
Kafka version: 2.0.0,
Cluster Manager: Kubernetes.
I have tried this scenario by changing the checkpointing location but the issue still persists.
{
SparkConf sparkConf = new SparkConf().setAppName("SparkKafkaConsumer");
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> stream = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option(subscribeType, "REQUEST_TOPIC")
.option("failOnDataLoss",false)
.option("maxOffsetsPerTrigger","50")
.option("startingOffsets","latest")
.load()
.selectExpr(
"CAST(value AS STRING) as payload",
"CAST(key AS STRING)",
"CAST(topic AS STRING)",
"CAST(partition AS STRING)",
"CAST(offset AS STRING)",
"CAST(timestamp AS STRING)",
"CAST(timestampType AS STRING)");
DataStreamWriter<String> dataWriterStream = stream
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.max.request.size", "35000000")
.option("kafka.retries", "5")
.option("kafka.batch.size", "35000000")
.option("kafka.receive.buffer.bytes", "200000000")
.option("kafka.acks","0")
.option("kafka.compression.type", "snappy")
.option("kafka.linger.ms", "0")
.option("kafka.buffer.memory", "50000000")
.option("topic", "RESPONSE_TOPIC")
.outputMode("append")
.option("checkpointLocation", checkPointDirectory);
spark.streams().awaitAnyTermination();
}