I am new to Kafka Streams and I am trying to experiment the behaviour of kafka streams in case of timeouts.
Here is the scenario I am testing using Processor API:
My kafka streaming app consumes from a kafka topic (String key,String message) and writes to a kafka topic (String key,String message)
I have set the Consumer Config parameter max.poll.interval.ms to 60000 ms.
My process method looks like this:
public void process(String key, String value) { System.out.println("the key is : " +key); LocalDateTime start= LocalDateTime.now(); System.out.println("startTime:" + dtf.format(start)); if ( key.startsWith("12345678"){ Thread.sleep(80000); } System.out.println("done sleeping"); LocalDateTime end=LocalDateTime.now(); System.out.println("endTime:" + dtf.format(end)); System.out.println("Offset*****"+context.offset()+" partitionId****"+context.partition()+"taskId*****"+context.taskId()+ "javaThreadId*******"+ Thread.currentThread().getId()+ " value****"+value); }
All other configurations are set to default.
I am trying to see how the app behaves if the processing time is more than the max.poll.interval.ms.
This is what happens: At the first attempt, it begins consuming the message from kafka topic and on calling process() it starts to sleep. After 60000 ms it again calls the process method, without throwing any exception but at this point it exits sleep in just 20000 ms printing , "done sleeping" and posts message to output topic. After this, it again begins to consume the same message from the same offset without committing. This happens in a loop.
Sample output:
the key is : 12345678
startTime:2018/07/09 07:34:25
the key is : 12345678
startTime:2018/07/09 07:35:27
done sleeping
endTime:2018/07/09 07:35:45
Offset*****224 partitionId****0taskId*****0_0javaThreadId*******12 value****abc
the key is : 12345678
startTime:2018/07/09 07:36:27
done sleeping
endTime:2018/07/09 07:36:47
Offset*****224 partitionId****0taskId*****0_0javaThreadId*******14 value****abc
the key is : 12345678
startTime:2018/07/09 07:37:27
done sleeping
endTime:2018/07/09 07:37:47
Offset*****224 partitionId****0taskId*****0_0javaThreadId*******12 value****abc
- I have tried to explicitly call context#commit() but it doesn't work either. What am I missing here? Does kafka streams remember the previous processing state? If not why does it say "done sleeping" exactly after 20000 ms after the first attempt ( mall.poll.interval.ms- 60000 , processing time (sleep) set to 80000 ms ) ?
Additional info:
My input and output topics have 2 partitions each and I have configured StreamsConfig num.streams.threads to 2.
I have a 3 node kafka cluster - Kafka and Kafka Streams version 1.1.0
I do not use punctuate method nor any complex processing anywhere.
Thanks in advance.