3
votes

I've got an integration flow that looks like the following:

@Bean
public IntegrationFlow auditFlow(@Qualifier("eventLoggingConnectionFactory") ConnectionFactory connectionFactory,
                                 @Qualifier("writeChannel") MessageChannel messageChannel,
                                 @Qualifier("parseErrorChannel") MessageChannel errorChannel) {
    return IntegrationFlows
            .from(Amqp.inboundAdapter(connectionFactory, auditQueue)
                    .errorChannel(errorChannel)
                    .concurrentConsumers(numConsumers)
                    .messageConverter(new MongoMessageConverter())) // converts JSON to org.bson.Document
            .enrichHeaders(e -> e.<Document>headerFunction(MongoWriteConfiguration.MONGO_COLLECTION_HEADER_KEY,
                    o -> getNamespace(o.getPayload())))
            .channel(messageChannel)
            .get();
}

The message converter can of course err if a malformed message is introduced, raising a MessageConversionException. In that case, I of course don't want the message requeued -- but I also don't want to set the default to NOT requeue rejected message, as I can on do on the AmqpInboundAdapterSpec. What's the right way for me to not requeue messages that err in that manner (and otherwise republish them for debugging purposes)?

More generally, downstream processes in the same flow can err for data that's more semantically malformed -- once again, I don't want to requeue them. I could throw an AmqpRejectAndDontRequeueException at those times, but then I'm losing separation of concerns, which is half the point of this. What's the right way to act on those exceptions -- perhaps there's a way to translate to AmqpRejectAndDontRequeueException?

1

1 Answers

3
votes

The default error handler in the inbound adapter's SimpleMessageListenerContainer (a ConditionalRejectingErrorHandler) does just that (it detects a MessageConversionException and throws an AmqpRejectAndDontRequeueException).

You can customize a ConditionalRejectingErrorHandler by injecting your own FatalExceptionStrategy to look for other exception types and handle them the same way.

The DefaultExceptionStrategy looks like this...

@Override
public boolean isFatal(Throwable t) {
    if (t instanceof ListenerExecutionFailedException
            && t.getCause() instanceof MessageConversionException) {
        if (logger.isWarnEnabled()) {
            logger.warn("Fatal message conversion error; message rejected; "
                    + "it will be dropped or routed to a dead letter exchange, if so configured: "
                    + ((ListenerExecutionFailedException) t).getFailedMessage(), t);
        }
        return true;
    }
    return false;
}

It is only invoked if there's not already an AmqpRejectAndDontRequeueException in the cause chain.