4
votes

I am trying to use Spark's Direct Approach (No Receivers) for Kafka, I have following Kafka configuration map:

configMap.put("zookeeper.connect","192.168.51.98:2181");
configMap.put("group.id", UUID.randomUUID().toString());
configMap.put("auto.offset.reset","smallest");
configMap.put("auto.commit.enable","true");
configMap.put("topics","IPDR31");
configMap.put("kafka.consumer.id","kafkasparkuser");
configMap.put("bootstrap.servers","192.168.50.124:9092");

Now my objective is, if my Spark pipeline crashes and it is started again, the stream should be started from the latest offset committed by the consumer group. So, for that purpose, I want to specify the starting offset for consumer. I have information about the offsets committed in each partition. How I can supply this information to the streaming function. Currently I am using

JavaPairInputDStream<byte[], byte[]> kafkaData =
   KafkaUtils.createDirectStream(js, byte[].class, byte[].class,
     DefaultDecoder.class, DefaultDecoder.class,configMap,topic); 
2
How to get the information about the offsets committed in each partition? Can you please explain that. - void
you can get this information from zookeeper. you can refer this to figure out the code in java - abi_pat
But, using directstream is it not possible without updating zookeeper? Can check-pointing achieve this? - void

2 Answers

6
votes

Look at the second form of createDirectStream in the Spark API docs - it allows you to pass in a Map<TopicAndPartition, Long>, where the Long is the offset.

Note that Spark will not automatically update your offsets in Zookeeper when using a DirectInputStream - you have to write them yourself either to ZK or some other database. Unless you have a strict requirement of exactly-once semantics, it will be easier to use the createStream method to get back a DStream, in which case Spark will update the offsets in ZK and resume from the last stored offset in the case of failure.

2
votes

For your requirement, the correct solution is to use checkpoint. For every RDDStream processed, checkpoint will write the metadata to a shared storage specified (typically hdfs). Its metadata, not the real data so there is no real performance impact.

If the spark process is crashed and restarted, it will first read the checkpoint and resume from the saved offsets from checkpoint.

You can refer the sample code where i use spark streaming to write data to elasticsearch reliabily using checkpoint. https://github.com/atulsm/Test_Projects/blob/master/src/spark/StreamingKafkaRecoverableDirectEvent.java