I use spark-streaming to read kafka data,and process every line
I use below to create a streaming :
lines = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics,kafkaParams)
);
and then I use this code to process data from kafka
lines.foreachRDD((JavaRDD<ConsumerRecord<String, String>> rdd) -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
OffsetRange[] range = new OffsetRange[1];
range[0] = o;
rdd.foreachPartition((Iterator<ConsumerRecord<String, String>> partitionOfRecords) -> {
// get kafka offset
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
// to cache line data
List<String> jsonData = new ArrayList<>();
// to read all line data
while (partitionOfRecords.hasNext()) {
ConsumerRecord<String, String> line = partitionOfRecords.next();
jsonData.add(line.value());
}
// TODO do my own bussiness from jsonData
.......
// HOW can I commit kafka Offset Here??
// this is a method to commit offset
((CanCommitOffsets) lines.inputDStream()).commitAsync(range)
});
});
And I have try many times, I found it have some problem:
How does it work if my data process success when other partition failed? it means all my data process should come back? Because kafka offset has commit;
I have run this code, then I found it really execute commit operate is when the next time this rdd executor run,it means if the progress oom or be killed , the next time some data I read from Kafka will double ?