2
votes

I start using spark structured streaming.

I get readStream from kafka topic (startOffset: latest) with waterMark, group by event time with window duration, and write to kafka topic.

My question is, How can I handle the data written to the kafka topic before spark structured streaming job?

I tried to run with `startOffset: earliest' at first. but the data in the kafka topic is too large, so spark streaming process is not started because of yarn timeout. (even though I increase timeout value)

1. If I simply create a batch job and filter by specific data range. the result is not reflected in the current state of spark streaming, there seems to be a problem with the consistency and accuracy of the result.

  1. I tried to reset the checkpoint directory but It did not work.

How can I handle the old and large data? Help me.

1

1 Answers

1
votes

you can try the parmeter maxOffsetsPerTrigger for Kafka + Structured Streaming for receiving old data from Kafka. Set the value for this parameter to the number of records you want to receive from Kafka at one time.

Use:

sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test-name")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", 1)
      .option("group.id", "2")
      .option("auto.offset.reset", "earliest")
      .load()