3
votes

I use spark-streaming to read kafka data,and process every line

I use below to create a streaming :

lines = KafkaUtils.createDirectStream(
            jssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics,kafkaParams)
    );

and then I use this code to process data from kafka

    lines.foreachRDD((JavaRDD<ConsumerRecord<String, String>> rdd) -> {
          OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
          OffsetRange[] range = new OffsetRange[1];
          range[0] = o;

          rdd.foreachPartition((Iterator<ConsumerRecord<String, String>> partitionOfRecords) -> {
          // get kafka offset
          OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
          // to cache line data
          List<String> jsonData = new ArrayList<>();
          // to read all line data
          while (partitionOfRecords.hasNext()) {
                ConsumerRecord<String, String> line = partitionOfRecords.next();
                jsonData.add(line.value());
          }
          // TODO  do my own bussiness from jsonData
          .......
          //  HOW can I commit kafka Offset Here??
          // this is a method to commit offset 
          ((CanCommitOffsets) lines.inputDStream()).commitAsync(range) 
      });
    });

And I have try many times, I found it have some problem:

  1. How does it work if my data process success when other partition failed? it means all my data process should come back? Because kafka offset has commit;

  2. I have run this code, then I found it really execute commit operate is when the next time this rdd executor run,it means if the progress oom or be killed , the next time some data I read from Kafka will double ?

1

1 Answers

0
votes

How does it work if my data process success when other partition failed? it means all my data process should come back? Because kafka offset has commit

If a particular task fails, Spark will attempt to re-execute it in place according to the spark.task.maxFailures setting. If the number has passed, the entire job will fail. You need to make sure that if the part before commitAsync fails, you don't commit offsets.

I have run this code, then I found it really execute commit operate is when the next time this rdd executor run,it means if the progress oom or be killed , the next time some data I read from Kafka will double ?

Yes. If the job is killed before the next batch iteration, Spark will attempt to re-read data that's already been processed.