I am trying to send messages to DLQ on processing exception however I keep getting serialization exception when I use SendToDlqAndContinue from Spring-boot-kafka-streams-binder
@EnableBinding(ConsumerStreamsWay.KStreamBinding.class)
public class ConsumerStreamsWay {
@Autowired
private SendToDlqAndContinue dlqHandler;
@StreamListener
public void topic3Processor2(@Input("topic3") KStream<String, String> input) {
input.process(() -> new Processor<String, String>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
try {
System.out.println("Received from topic3: " + value);
if (value.startsWith("fail")) {
System.out.println("This is supposed to fail.");
throw new RuntimeException("Failing knowingly.");
}
} catch (Exception e) {
//explicitly provide the kafka topic corresponding to the input binding as the first argument.
//DLQ handler will correctly map to the dlq topic from the actual incoming destination.
System.out.println("Going to send to DLQ..");
dlqHandler.sendToDlq(new ConsumerRecord<>("topic3", context.partition(), context.offset(), key, value), e);
}
}
@Override
public void close() {
//nothing needs to be done.
}
});
}
interface KStreamBinding {
@Input("topic3")
KStream<String, String> input();
}
Below is my exception stack:
org.apache.kafka.common.errors.SerializationException: Can't convert key of
class java.lang.String to class
org.apache.kafka.common.serialization.ByteArraySerializer specified in
key.serializer
Caused by: java.lang.ClassCastException: class java.lang.String cannot be
cast to class [B (java.lang.String and [B are in module java.base of loader
'bootstrap') at
org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:886) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856) ~[kafka-clients-2.3.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:592) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:404) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:241) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:214) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:159) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.SendToDlqAndContinue.sendToDlq(SendToDlqAndContinue.java:51) ~[spring-cloud-stream-binder-kafka-streams-3.0.4.RELEASE.jar:3.0.4.RELEASE]
I have read somewhere that sendToDlq was only having default producer config till a particular version however I use the latest binder version 3.0.4.RELEASE.
any leads appreciated.