I'm using spring-cloud-stream kafka binder with schema registry. (not kakfa streams) What I'm trying to do is when un-deserializable message has got into an input topic, send the un-deserializable message to dlq.
So I tried like below configuration but spring cloud stream app keeps retrying infinitely and says
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
kafka:
binder:
brokers: localhost:9092
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
autoCommitOnError: true
autoCommitOffset: true
default:
consumer:
configuration:
schema.registry.url: http://localhost:8081
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
What am I doing wrong? Please help.