7
votes

I looked around hard but didn't find a satisfactory answer to this. Maybe I'm missing something. Please help.

We have a Spark streaming application consuming a Kafka topic, which needs to ensure end-to-end processing before advancing Kafka offsets, e.g. updating a database. This is much like building transaction support within the streaming system, and guaranteeing that each message is processed (transformed) and, more importantly, output.

I have read about Kafka DirectStreams. It says that for robust failure-recovery in DirectStreaming mode, Spark checkpointing should be enabled, which stores the offsets along with the checkpoints. But the offset management is done internally (setting Kafka config params like ["auto.offset.reset", "auto.commit.enable", "auto.offset.interval.ms"]). It does not speak of how (or if) we can customize committing offsets (once we've loaded a database, for e.g.). In other words, can we set "auto.commit.enable" to false and manage the offsets (not unlike a DB connection) ourselves?

Any guidance/help is greatly appreciated.

3
Is there any python implementation available for the manual offset commit in pyspark. I am not able to find it anywhere - Girish Gupta

3 Answers

1
votes

The article below could be a good start to understand the approach.

spark-kafka-achieving-zero-data-loss

Further more,

The article suggests using zookeeper client directly, which can be replaced by something like KafkaSimpleConsumer also. The advantage of using Zookeper/KafkaSimpleConsumer is the monitoring tools that depend on Zookeper saved offset. Also the information can also be saved on HDFS or any other reliable service.

0
votes

If you check you logs you would see

2019-10-24 14:14:45 WARN  KafkaUtils:66 - overriding enable.auto.commit to false for executor
2019-10-24 14:14:45 WARN  KafkaUtils:66 - overriding auto.offset.reset to none for executor
2019-10-24 14:14:45 WARN  KafkaUtils:66 - overriding executor group.id to spark-executor-customer_pref_az_kafka_spout_stg_2
2019-10-24 14:14:45 WARN  KafkaUtils:66 - overriding receive.buffer.bytes to 65536 see KAFKA-3135

These properties are overridden by spark code.

In order to manually commit you could follow spark docs

https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself

-1
votes

According to https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#kafka-itself enable.auto.commit must be set to false and

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // some time later, after outputs have completed
  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});