0
votes

I am using the strategy provided here in spark streaming documentation for the committing to kafka itself. My flow is like so: Topic A --> Spark Stream [ foreachRdd process -> send to topic b] commit offset to topic A

    JavaInputDStream<ConsumerRecord<String, Request>> kafkaStream = KafkaUtils.createDirectStream(
            streamingContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, Request>Subscribe(inputTopics, kafkaParams)
    );

    kafkaStream.foreachRDD(rdd -> {
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd).offsetRanges();
                rdd.foreachPartition(
                        consumerRecords -> {
                            OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
                            System.out.println(String.format("$s %d %d $d", o.topic(), o.partition(), o.fromOffset(), o.untilOffset()));
                            consumerRecords.forEachRemaining(record -> doProcess(record));
                        });

                ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
            }
    );

So let's say the RDD gets 10 events from topic A and while processing for each of them I send a new event to topic B. Now supposed that one of those responses fails. Now I don't want to commit that particular offset to topic A. Topic A and B have the same number of partitions N. So each RDD should be consuming from the same partition. What would be the best strategy to keep processing? How can I reset the stream to try to process those events from topic A until it succeeds? I know if I can't continue processing that partition without committing because that would automatically move the offset and the failed record would not be processed again.

I don't know how if it is possible for the stream/rdd to keep trying to process the same messages for that partition only, while the other partitions/rdd can keep working. If I throw an exception from that particular RDD what would happened to my job. Would it fail? Would I need to restart it manually? With regular consumers you could retry/recover but I am not sure what happens with Streaming.

1

1 Answers

0
votes

This is what I came up with and it takes the input data and then sends a request using the output topic. The producer has to be created inside the foreach loop otherwise spark will try to serialize and send it to all the workers. Notice the response is send asynchronously. This means that I am using at least one semantics in this system.

kafkaStream.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
        rdd.foreachPartition(
                partition -> {
                    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
                    System.out.println(String.format("%s %d %d %d", o.topic(), o.partition(), o.fromOffset(), o.untilOffset()));

                    // Print statements in this section are shown in the executor's stdout logs
                    KafkaProducer<String, MLMIOutput> producer = new KafkaProducer(producerConfig(o.partition()));
                    partition.forEachRemaining(record -> {

                        System.out.println("request: "+record.value());

                        Response data = new  Response …
                        // As as debugging technique, users can write to DBFS to verify that records are being written out
                        // dbutils.fs.put("/tmp/test_kafka_output",data,true)
                        ProducerRecord<String, Response> message = new ProducerRecord(outputTopic, null, data);
                        Future<RecordMetadata> result = producer.send(message);
                        try {
                            RecordMetadata metadata = result.get();
                            System.out.println(String.format("offset='$d' partition='%d' topic='%s'timestamp='$d",
                            metadata.offset(),metadata.partition(),metadata.topic(),metadata.timestamp()));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (ExecutionException e) {
                            e.printStackTrace();
                        }
                    });
                    producer.close();
                });

        ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
    }

);