I am using the strategy provided here in spark streaming documentation for the committing to kafka itself. My flow is like so: Topic A --> Spark Stream [ foreachRdd process -> send to topic b] commit offset to topic A
JavaInputDStream<ConsumerRecord<String, Request>> kafkaStream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Request>Subscribe(inputTopics, kafkaParams)
);
kafkaStream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd).offsetRanges();
rdd.foreachPartition(
consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(String.format("$s %d %d $d", o.topic(), o.partition(), o.fromOffset(), o.untilOffset()));
consumerRecords.forEachRemaining(record -> doProcess(record));
});
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
}
);
So let's say the RDD gets 10 events from topic A and while processing for each of them I send a new event to topic B. Now supposed that one of those responses fails. Now I don't want to commit that particular offset to topic A. Topic A and B have the same number of partitions N. So each RDD should be consuming from the same partition. What would be the best strategy to keep processing? How can I reset the stream to try to process those events from topic A until it succeeds? I know if I can't continue processing that partition without committing because that would automatically move the offset and the failed record would not be processed again.
I don't know how if it is possible for the stream/rdd to keep trying to process the same messages for that partition only, while the other partitions/rdd can keep working. If I throw an exception from that particular RDD what would happened to my job. Would it fail? Would I need to restart it manually? With regular consumers you could retry/recover but I am not sure what happens with Streaming.