1
votes

I have built an application using the Apache Kafka and Apache Spark Structured streaming. I am facing the below issue.

Scenario:

  • I set up a Spark structured stream with a source of Kafka topic and sink as Kafka topic.
  • We run the stream and produce a number of messages on the Kafka topic.
  • We stopped the stream and restart stream by clearing checkpointing location of the stream. After running for 5 to 6 hour later stream is consuming old Kafka messages randomly.

After clearing checkpointing location I was expecting only new messages on stream.
Spark version: 2.4.0, Kafka-client version: 2.0.0, Kafka version: 2.0.0, Cluster Manager: Kubernetes.

I have tried this scenario by changing the checkpointing location but the issue still persists.

{
SparkConf sparkConf = new SparkConf().setAppName("SparkKafkaConsumer");
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> stream = spark
        .readStream()
        .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option(subscribeType, "REQUEST_TOPIC")
            .option("failOnDataLoss",false)
            .option("maxOffsetsPerTrigger","50")
            .option("startingOffsets","latest")
            .load()
            .selectExpr(
                  "CAST(value AS STRING) as payload",
                  "CAST(key AS STRING)",
                  "CAST(topic AS STRING)",
                  "CAST(partition AS STRING)",
                  "CAST(offset AS STRING)",
                  "CAST(timestamp AS STRING)",
                  "CAST(timestampType AS STRING)");

DataStreamWriter<String>  dataWriterStream = stream
            .writeStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("kafka.max.request.size", "35000000")
            .option("kafka.retries", "5")
            .option("kafka.batch.size", "35000000")
            .option("kafka.receive.buffer.bytes", "200000000")
            .option("kafka.acks","0")
            .option("kafka.compression.type", "snappy")
            .option("kafka.linger.ms", "0")
            .option("kafka.buffer.memory", "50000000")
            .option("topic", "RESPONSE_TOPIC")
            .outputMode("append")
            .option("checkpointLocation", checkPointDirectory);
spark.streams().awaitAnyTermination();

}

1
Could you attach code with Kafka configuration, that you used in Spark job? - Bartosz Wardziński
This issue occurs due to issues.apache.org/jira/browse/SPARK-26267. It is resolved into Spark 2.4.1 - Deepraj B.

1 Answers

1
votes

check below link,

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-checkpointing.html

You call SparkContext.setCheckpointDir(directory: String) to set the checkpoint directory - the directory where RDDs are checkpointed. The directory must be a HDFS path if running on a cluster. The reason is that the driver may attempt to reconstruct the checkpointed RDD from its own local file system, which is incorrect because the checkpoint files are actually on the executor machines