1
votes

I am using Spring Cloud Stream with Avro and Confluent Schema Registry. I am using a single DLQ topic for all services, so messages with different schema may land in this topic. I have disabled the dynamic schema registration to ensure an incorrect message does not pass (schemaspring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled= false).

However, the problem is due to the lack of schema on the dlq, I may lose a message from landing into this topic. Therefore, I would like to have the ability to produce messages in the JSON format to dlq and use Avro for the rest of the pipeline. I would appreciate it if someone can help me how this is achievable or can point me to an example for this matter.

1

1 Answers

1
votes

If you are using Stream 2.1 or later, disable DLQ processing in the binder and use a ListenerContainerCustomizer bean to add a custom ErrorHandler to the listener container; you can use a SeekToCurrentErrorHandler with a custom recoverer - you can use the DeadLetterPublishingRecoverer as a starting point - override this method...

/**
 * Subclasses can override this method to customize the producer record to send to the
 * DLQ. The default implementation simply copies the key and value from the consumer
 * record and adds the headers. The timestamp is not set (the original timestamp is in
 * one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
 * less than 0, it must be set to null in the {@link ProducerRecord}.
 * @param record the failed record
 * @param topicPartition the {@link TopicPartition} returned by the destination
 * resolver.
 * @param headers the headers - original record headers plus DLT headers.
 * @param data the value to use instead of the consumer record value.
 * @param isKey true if key deserialization failed.
 * @return the producer record to send.
 * @see KafkaHeaders
 */
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
        TopicPartition topicPartition, RecordHeaders headers, @Nullable byte[] data, boolean isKey) {