0
votes

I am trying to send messages to DLQ on processing exception however I keep getting serialization exception when I use SendToDlqAndContinue from Spring-boot-kafka-streams-binder

@EnableBinding(ConsumerStreamsWay.KStreamBinding.class)
public class ConsumerStreamsWay {

@Autowired
private SendToDlqAndContinue dlqHandler;

@StreamListener
public void topic3Processor2(@Input("topic3") KStream<String, String> input) {

    input.process(() -> new Processor<String, String>() {
        ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
        }

        @Override
        public void process(String key, String value) {
            try {
                System.out.println("Received from topic3: " + value);
                if (value.startsWith("fail")) {
                    System.out.println("This is supposed to fail.");
                    throw new RuntimeException("Failing knowingly.");
                }
            } catch (Exception e) {
                //explicitly provide the kafka topic corresponding to the input binding as the first argument.
                //DLQ handler will correctly map to the dlq topic from the actual incoming destination.
                System.out.println("Going to send to DLQ..");
                dlqHandler.sendToDlq(new ConsumerRecord<>("topic3", context.partition(), context.offset(), key, value), e);
            }
        }

        @Override
        public void close() {
            //nothing needs to be done.
        }
    });
}

interface KStreamBinding {
    @Input("topic3")
    KStream<String, String> input();
}

Below is my exception stack:

org.apache.kafka.common.errors.SerializationException: Can't convert key of 
class java.lang.String to class 
org.apache.kafka.common.serialization.ByteArraySerializer specified in 
key.serializer
Caused by: java.lang.ClassCastException: class java.lang.String cannot be 
cast to class [B (java.lang.String and [B are in module java.base of loader 
'bootstrap') at 
org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:886) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856) ~[kafka-clients-2.3.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:592) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:404) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:241) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:214) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:159) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.SendToDlqAndContinue.sendToDlq(SendToDlqAndContinue.java:51) ~[spring-cloud-stream-binder-kafka-streams-3.0.4.RELEASE.jar:3.0.4.RELEASE]

I have read somewhere that sendToDlq was only having default producer config till a particular version however I use the latest binder version 3.0.4.RELEASE.

any leads appreciated.

1

1 Answers

0
votes

As the error indicates, it expects byte array, so doing key.getBytes(), value.getBytes() should fix it.

dlqHandler.sendToDlq(new ConsumerRecord<>("topic3", context.partition(), context.offset(), key.getBytes(), value.getBytes()), e);