I identified an error when I changed the DeadLetterPublishingRecoverer destionationResolver.
When I use:
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".ERR", cr.partition());
it works perfectly.
However, if you use _ERR instead of .ERR, an error occurs:
2020-08-05 12:53:10,277 [kafka-producer-network-thread | producer-kafka-tx-group1.ABC_TEST_XPTO.0] WARN o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId=kafka-tx-group1.ABC_TEST_XPTO.0] Error while fetching metadata with correlation id 7 : {ABC_TEST_XPTO_ERR=INVALID_TOPIC_EXCEPTION}
2020-08-05 12:53:10,278 [kafka-producer-network-thread | producer-kafka-tx-group1.ABC_TEST_XPTO.0] ERROR org.apache.kafka.clients.Metadata - [Producer clientId=producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId=kafka-tx-group1.ABC_TEST_XPTO.0] Metadata response reported invalid topics [ABC_TEST_XPTO_ERR] 2020-08-05 12:53:10,309 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='null' and payload='XPTOEvent(super=Event(id=CAPBA2548, destination=ABC_TEST_XPTO, he...' to topic ABC_TEST_XPTO_ERR and partition 0: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [ABC_TEST_XPTO_ERR] 2020-08-05 12:53:10,320 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.DeadLetterPublishingRecoverer - Dead-letter publication failed for: ProducerRecord(topic=ABC_TEST_XPTO_ERR, partition=0, headers=RecordHeaders(headers = .. org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [ABC_TEST_XPTO_ERR] at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:573) at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:388) at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:290) at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:226) at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:54) at org.springframework.kafka.listener.FailedRecordTracker.skip(FailedRecordTracker.java:106) strong text
My topics use _ in the middle of the name, for example ABC_TEST_XPTO, so I would like to set up the dead letter topic with _ERR, if possible
My enviroment
Spring Boot 2.3.2.RELEASE
Spring-Kafka 2.5.3.RELEASE but the same problem occurs with 2.5.4.RELEASE
Java 11
private static final BiFunction<ConsumerRecord, Exception, TopicPartition> DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + "_ERR", cr.partition());
@Component class ContainerFactoryConfigurer {
ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory, ChainedKafkaTransactionManager<?, ?> tm, KafkaTemplate<Object, Object> template) { factory.getContainerProperties().setTransactionManager(tm); DefaultAfterRollbackProcessor rollbackProcessor = new DefaultAfterRollbackProcessor((record, exception) -> { }, new FixedBackOff(0L, Long.valueOf(maxAttemps)), template, true); factory.setAfterRollbackProcessor(rollbackProcessor); SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler( new DeadLetterPublishingRecoverer(Collections.singletonMap(Object.class, template), DESTINATION_RESOLVER), new FixedBackOff(0L, Long.valueOf(maxAttemps))); errorHandler.setCommitRecovered(true); errorHandler.setAckAfterHandle(true); factory.setErrorHandler(errorHandler); }
}
Thanks DPG