4
votes

'I am trying to route a message to a Dead letter topic in Kafka in case of any failures in processing the corresponding message. I have setup the SeektoCurrentErrorHandler and DeadLetterPublishingRecoverer for this functionality.

The consumer throws the following exception while doing this:

2020-08-07 12:09:38.841 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='a6558a22-470d-4708-b297-814996a42045' and payload='{123, 34, 101, 118, 101, 110, 116, 78, 97, 109, 101, 34, 58, 34, 116, 101, 115, 116, 95, 101, 120, 1...' to topic test_execution.DLT and partition 2:

org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.

2020-08-07 12:09:38.846 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.l.DeadLetterPublishingRecoverer    : Dead-letter publication failed for: ProducerRecord(topic=test_execution.DLT, partition=2, headers=RecordHeaders(headers = [RecordHeader(key = kafka_dlt-original-topic, value = [116, 101, 115, 116, 95, 101, 120, 101, 99, 117, 116, 105, 111, 110]), RecordHeader(key = kafka_dlt-original-partition, value = [0, 0, 0, 2]), RecordHeader(key = kafka_dlt-original-offset, value = [0, 0, 0, 0, 0, 23, 15, -72]), RecordHeader(key = kafka_dlt-original-timestamp, value = [0, 0, 1, 115, -57, 103, -70, -126]), RecordHeader(key = kafka_dlt-original-timestamp-type, value = [67, 114, 101, 97, 116, 101, 84, 105, 109, 101]), RecordHeader(key = kafka_dlt-exception-fqcn, value = [111, 114, 103, 46, 115, 112, 114, 105, 110, 103, 102, 114, 97, 109, 101, 119, 111, 114, 107, 46, 107, 97, 102, 107, 97, 46, 108, 105, 115, 116, 101, 110, 101, 114, 46, 76, 105, 115, 116, 101, 110, 101, 114, 69, 120, 101, 99, 117, 116, 105, 111, 110, 70, 97, 105, 108, 101, 100, 69, 120, 99, 101, 112, 116, 105, 111, 110]), RecordHeader(key = kafka_dlt-exception-message, value = [    
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:385) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:278) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:214) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:54) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.FailedRecordProcessor.getSkipPredicate(FailedRecordProcessor.java:167) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:104) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1887) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1792) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1719) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1617) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.

I have already created the test_execution.DLT topic in the kafka cluster and I am clearly able to produce messages to this topic from the consoler-producer.

The consumer is running inside a docker container and the kafka cluster is a 3 VM setup. These are the configs used by the kafka consumer:

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put("spring.kafka.consumer.properties.spring.deserializer.key.delegate.class", StringDeserializer.class);
        props.put("spring.kafka.consumer.properties.spring.deserializer.value.delegate.class", JsonDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return props;
    }
    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),  new ErrorHandlingDeserializer<>(new StringDeserializer()),
                new ErrorHandlingDeserializer<>(new JsonDeserializer<>(AutomationEvent.class,false)));
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(KafkaOperations kafkaOperations) {
        return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaOperations), new FixedBackOff(10000, 3));
    }

Am I missing something here? Do I need to modify any server configuration for this to be updated?

3

3 Answers

3
votes

test_execution.DLT not present

The framework does not automatically create the dead letter topic for you; it has to exist already.

You can instruct the framework to create the topic by adding a NewTopic @Bean.

See this answer for an example.

EDIT

By default, we will send the record to the same partition so the DLT must have at least as many partitions as the original topic, unless you supply a destination resolver.

See the documentation.

By default, the dead-letter record is sent to a topic named .DLT (the original topic name suffixed with .DLT) and to the same partition as the original record. Therefore, when you use the default resolver, the dead-letter topic must have at least as many partitions as the original topic. If the returned TopicPartition has a negative partition, the partition is not set in the ProducerRecord, so the partition is selected by Kafka.

1
votes

Make sure you also specify the producer configuration if you want to send messages to topics from the consumers. The property for this is spring.kafka.producer.bootstrap-servers

This property is necessary or else the producer component tries to connect to locahost by default, which leads to no topics found.

0
votes

Consumer does not have to send any information to DLT, it is handled by the framework, only that the topic has to exist before

Follow this discussion -> DeadLetterPublishingRecoverer - Dead-letter publication failed with InvalidTopicException for name topic at TopicPartition ends with _ERR