3
votes

I'm looking into storing kafka offsets inside of kafka for Spark Structured Streaming, like it's working for DStreams stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges), the same I'm looking but for Structured Streaming. Is it supporting for structured streaming ? If yes, how can I achieve it ?

I know about hdfs checkpointing using .option("checkpointLocation", checkpointLocation), but I'm interested exactly for built-in offset management.

I'm expecting kafka to store offsets only inside without spark hdfs checkpoint.

2

2 Answers

0
votes

I am using this piece of code found somewhere.

public class OffsetManager {

    private String storagePrefix;

    public OffsetManager(String storagePrefix) {
        this.storagePrefix = storagePrefix;
    }

    /**
     * Overwrite the offset for the topic in an external storage.
     *
     * @param topic     - Topic name.
     * @param partition - Partition of the topic.
     * @param offset    - offset to be stored.
     */
    void saveOffsetInExternalStore(String topic, int partition, long offset) {

        try {

            FileWriter writer = new FileWriter(storageName(topic, partition), false);

            BufferedWriter bufferedWriter = new BufferedWriter(writer);
            bufferedWriter.write(offset + "");
            bufferedWriter.flush();
            bufferedWriter.close();

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    /**
     * @return he last offset + 1 for the provided topic and partition.
     */
    long readOffsetFromExternalStore(String topic, int partition) {

        try {

            Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));

            return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;

        } catch (Exception e) {
            e.printStackTrace();
        }

        return 0;
    }

    private String storageName(String topic, int partition) {
        return "Offsets\\" + storagePrefix + "-" + topic + "-" + partition;
    }

}

SaveOffset...is called after the record processing is successful otherwise no offset is stored. and I am using Kafka topics as source so I specify the startingoffsets as the retrieved offsets from ReadOffsets...

0
votes

"Is it supporting for structured streaming?"

No, it is not supported in Structured Streaming to commit offsets back to Kafka, similar to what could be done using Spark Streaming (DStreams). The Spark Structured Streaming + Kafka Integration Guide on Kafka specific configurations is very precise about this:

"Kafka source doesn’t commit any offset."

I have written a more comprehensive answer about this in How to manually set groupId and commit Kafka offsets in Spark Structured Streaming.