I have a consume-transform-produce workflow in a micro service using Spring(boot) Kafka. I need to achieve the exactly-once scemantics provided by Kafka transaction. Here's the code snippet below:
Config
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(props);
defaultKafkaProducerFactory.setTransactionIdPrefix("kafka-trx-");
return defaultKafkaProducerFactory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<String, String> kafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
@Bean
@Qualifier("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<String, Object> chainedKafkaTransactionManager(KafkaTransactionManager<String, String> kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<>(kafkaTransactionManager);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> concurrentKafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, Object> chainedKafkaTransactionManager) {
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
concurrentKafkaListenerContainerFactory.setBatchListener(true);
concurrentKafkaListenerContainerFactory.setConcurrency(nexusConsumerConcurrency);
//concurrentKafkaListenerContainerFactory.setReplyTemplate(kafkaTemplate());
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
concurrentKafkaListenerContainerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
return concurrentKafkaListenerContainerFactory;
}
Listener
@KafkaListener(topics = "${kafka.xxx.consumerTopic}", groupId = "${kafka.xxx.consumerGroup}", containerFactory = "concurrentKafkaListenerContainerFactory")
public void listen(@Payload List<String> msgs, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Integer> offsets) {
int i = -1;
for (String msg : msgs) {
++i;
LOGGER.debug("partition={}; offset={}; msg={}", partitions.get(i), offsets.get(i), msg);
String json = transform(msg);
kafkaTemplate.executeInTransaction(kt -> kt.send(producerTopic, json));
}
}
However in the product environment, I encounter a weird problem. The offset is increased by two per message sent by the producer and consumer doesn't commit the consuming offset.
Topic1 consumer detail
Produce to topic2
However the count of messages sent by the producer is the same as the consumed. The downstream of the producer can receive the msgs from topic2 continuously. There's no error or exception found in the log.
I wonder why consume-transform-produce workflow seems ok(exactly-once scemantics also guaranteed), but the consumed offset isn't committed and the produced msg offset increment is two instead of 1 for per single msg.
How to fix it? Thx!