3
votes

I have a simple structured streaming application that just reads data from one Kafka topic and writes to another.

SparkConf conf = new SparkConf()
        .setMaster("local[*]")
        .setAppName("test");

SparkSession spark = SparkSession
        .builder()
        .config(conf)
        .getOrCreate();

Dataset<Row> dataset = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "start")
        .load();

StreamingQuery query = dataset
        .writeStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("checkpointLocation", "checkpoint")
        .option("topic", "end")
        .start();

query.awaitTermination(20000);

There are two messages to be processed on the topic start. This code runs without exception, however no messages ever end up on the topic end. What is wrong with this example?

1

1 Answers

0
votes

The problem is that the messages were already on the stream and the starting offset was not set to "earliest".

Dataset<Row> dataset = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", start.getTopicName())
        .option("startingOffsets", "earliest")
        .load();