I am trying to use Spark's Direct Approach (No Receivers) for Kafka, I have following Kafka configuration map:
configMap.put("zookeeper.connect","192.168.51.98:2181");
configMap.put("group.id", UUID.randomUUID().toString());
configMap.put("auto.offset.reset","smallest");
configMap.put("auto.commit.enable","true");
configMap.put("topics","IPDR31");
configMap.put("kafka.consumer.id","kafkasparkuser");
configMap.put("bootstrap.servers","192.168.50.124:9092");
Now my objective is, if my Spark pipeline crashes and it is started again, the stream should be started from the latest offset committed by the consumer group. So, for that purpose, I want to specify the starting offset for consumer. I have information about the offsets committed in each partition. How I can supply this information to the streaming function. Currently I am using
JavaPairInputDStream<byte[], byte[]> kafkaData =
KafkaUtils.createDirectStream(js, byte[].class, byte[].class,
DefaultDecoder.class, DefaultDecoder.class,configMap,topic);