We are using apache kafka-streams 0.10.2.0 in an application. We are leveraging kafka-streams topology for passing the processed data on to the next topic till the end of processing.
Also, We use AWS ECS container to deploy Consumer Application. We observed consumer is picking up very old messages to process, although they have been processed earlier. This issue which happens randomly at the time of service scaling up/down or in case of new deployments. I understand at the time of consumer rebalancing, some of the messages can be reprocessed. But in this case, it is reprocessing large amount of messages which were successfully processed long time back (more than 10 days old)
We are not able to understand the root cause of this issue. Is it not committing the offsets properly and picking up random messages in different topology. This leads to inconsistent behavior of one message being re-processed in any of the topology.
Suprisingly, we also dont see any exceptions in the consumer. Please provide help.
Here is the configurations we are using:
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,"UniqueKey");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,key);
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 60000));
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 6));
Here is the code snippet for Processors:
final KStreamBuilder builder = new KStreamBuilder();
builder.addSource(key, Serdes.String().deserializer(), executor.getDeserializer(), key);
builder.addProcessor(key + "_processor", () -> new KafkaProcessor(), key);
builder.addSink(key + "_sink", key + "_sink", key + "_processor");
final KafkaStreams streams = new KafkaStreams(builder, StreamConfigurations.getStreamsConfgurations(key, kafkaHost));
streams.start();
streams.setUncaughtExceptionHandler((t, th) -> {
_logger.error("UncaughtException in Kafka StreamThread " + t.getName() + " exception = ", th.getMessage());
});
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
I have looked into some of the kafka re-processing blogs and thinking to try some more configurations listed below:
streamsConfiguration.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); //default is 10000
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); //default is 30000
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
streamsConfiguration.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000); //default is 5000
streamsConfiguration.put(ProducerConfig.ACKS_CONFIG,1);
streamsConfiguration.put(ProducerConfig.RETRIES_CONFIG,10);
Thanks, Alpa
executor? What isKafkaProcessordoing, perhaps share its code? Also, somewhat unrelated, IIRC you should set the UncaughtExceptionHandler prior to callingstreams.start(). - Michael G. Noll