1
votes

Consuming data from Kafka topics, both Flink and SparkStreaming provides checkpointing mechanism provided that auto.commit.enabled is set to false. Spark docs say:

Spark output operations are at-least-once. So if you want the equivalent of exactly-once semantics, you must either store offsets after an idempotent output or store offsets in an atomic transaction alongside output.

But Flink docs say:

With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint.

Reading other sources I guess Flink Checkpointing will save the state of the program as well as consuming offset but Spark Checkpointing just saves consuming offsets and because of that Spark say:

Spark output operations are at-least-once.

Can anyone says what is the differences and how someone can reach exactly-once semantic in reading data from Kafka topics?

2

2 Answers

0
votes

I think this covers what you are looking for: https://data-artisans.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink

The big difference between exactly-once and at-least-once is that with exactly-once you are guaranteed not to have duplicate data outputted. At-least-once guarantees that you won't lose any data (same with exactly-once) but there could be duplicate data outputted.

Edit:

I should mention I am not as familiar with Spark as I am with Flink, but this is a major thing that Flink touches on, which is why I provided the big overview documentation link for it. But the concept of exactly-once vs at-least-once is universal and not technology dependent.

0
votes

Can anyone says what is the differences and how someone can reach exactly-once semantic in reading data from Kafka topics?

Exactly once semantics cannot be achieved only on source side. Exactly once is a concept, which the whole streaming application need to support:

  • You need to ensure that all incoming data points are processed (at least once). That's what you get when you store source offsets in your checkpoints. In case of a failure, you reread the same data and reprocess everything that was lost during the last successful outage and the failure.
  • Your application code itself needs to ensure that everything it does in-between is deterministic and idempotent. If you trigger an action outside of the streamining framework, this action will retriggered during recovery. Calculations need to be consistent during recovery.
  • Duplicate output will be emitted during recovery. That is system-imminent and can not be avoided on the framework level. Hence, for exactly once output, you have three options:
    1. Defer output until a checkpoint has been written, which is the simplest and universal solution, but will of course introduce a huge lag.
    2. Use an idempotent sink. If you write to a KV store and never update any value, duplicates are implicitly filtered.
    3. Use a transactional sink, where duplicates are filtered. That's how Flink and Kafka Streams write into a Kafka topic.

To reiterate: data always needs to be read more than once in case of failures, that's why exactly once does not make sense only on source level. It needs to be implemented for the whole framework/application.

Getting the sink right is the hardest part. My guess it that Spark is not supporting that yet. It might be harder for the Spark developers because of the different ways checkpoints are actually taken in each of the mentioned system. But my Spark (streaming) knowledge is over a year old, so there may be ongoing developments in that regard.