3
votes

I am using springs SimpleMessageListenerContainer to consume messages from a RabbitMQ queue. Everything works fine, but when a invalid message is sent to the queue (e.g. invalid json) the listener just aborts, is shutting down the worker and doesn't accept any further messages.

Is it possible to configure it in a way that it discards the broken message and keeps listening to further messages?

I'm using sprint-rabbit-1.6.1.RELEASE.jar

My configuration looks like the following:

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                                MessageListenerAdapter listenerAdapter,
                                                MessageConverter messageConverter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("my.queue");
    container.setMessageListener(listenerAdapter);
    container.setMessageConverter(messageConverter);
    return container;
}

@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
MessageListenerAdapter listenerAdapter(Worker worker) {
    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage");
    messageListenerAdapter.setMessageConverter(new Jackson2JsonMessageConverter());
    return messageListenerAdapter;
}

The declaration of my listener method:

   public void processMessage(Map<String, String> message) {

When I send a message like '"routeId":"7"}' (broken json), then I get the Exception:

2016-09-02 08:10:35.821  WARN 35841 --- [   container-29] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) [spring-rabbit-1.6.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101]
at     org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 12 common frames omitted

2016-09-02 08:10:35.828 ERROR 35841 --- [   container-29]     o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal exception during processing

org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException: Invalid listener
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1351) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 1 common frames omitted
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101]
at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 12 common frames omitted

2016-09-02 08:10:35.833 ERROR 35841 --- [   container-29] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2016-09-02 08:10:35.833  INFO 35841 --- [   container-29] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2016-09-02 08:10:35.833  INFO 35841 --- [   container-29] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

The fatal Exception in SimpleMessageListenerContainer is thrown here:

catch (ListenerExecutionFailedException ex) {
                    // Continue to process, otherwise re-throw
                    if (ex.getCause() instanceof NoSuchMethodException) {
                        throw new FatalListenerExecutionException("Invalid listener", ex);
                    }
                }

So it seems it is supposed to shut down if the container is configured with a non-existent method. But in case of a broken message, it's trying to call the method with a wrong parameter type, which also causes a NoSuchMethodException. This means that any producer can kill my consumer with a broken message.

Thanks for any suggestions!

1
I am not sure what you mean by But in case of a broken message, it's trying to call the method with a wrong parameter type, which also causes a NoSuchMethodException.. You need to show the complete stack trace, not an edited one and what version are you using? Normally, bad JSON causes MessageConversionExceptions which are treated specially and such messages are rejected.Gary Russell
@GaryRussell Thanks for your comment. I added the complete stacktrace, and also more configuration. It seems it's trying to call my listener method ('processMessage') with a String as argument, instead of just failing earlier when trying to parse the json.johannesv
See my answer for a work around - I also opened a JIRA Issue.Gary Russell

1 Answers

3
votes

Interesting; I was able to reproduce your issue; it turns out that if the message contains no __TypeID__ header (conversion hint), it simply returns the "bad" json as a String.

I was able to solve it by injecting a custom class mapper into the converter.

You could also have the sending system set the type header.

Then, the message is rejected because we get a MessageConversionException.

package com.example;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class So39264965Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39264965Application.class, args);
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        template.convertAndSend("my.queue", new Foo());
        context.getBean(Worker.class).latch.await(60, TimeUnit.SECONDS);

        // bad json
        template.setMessageConverter(new SimpleMessageConverter());
        template.convertAndSend("", "my.queue", "\"routeId\":\"7\"}", m -> {
            m.getMessageProperties().setContentType("application/json");
            return m;
        });


        Thread.sleep(60000);
        context.close();
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter, MessageConverter messageConverter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("my.queue");
        container.setMessageListener(listenerAdapter);
        container.setMessageConverter(messageConverter);
        return container;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Queue queue() {
        return new Queue("my.queue");
    }

    @Bean
    public MessageConverter messageConverter() {
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        jackson2JsonMessageConverter.setClassMapper(new ClassMapper() {

            @Override
            public Class<?> toClass(MessageProperties properties) {
                return Foo.class;
            }

            @Override
            public void fromClass(Class<?> clazz, MessageProperties properties) {

            }

        });
        return jackson2JsonMessageConverter;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Worker worker) {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage");
        messageListenerAdapter.setMessageConverter(messageConverter());
        return messageListenerAdapter;
    }

    @Bean
    public Worker worker() {
        return new Worker();
    }

    public static class Worker {

        private final CountDownLatch latch = new CountDownLatch(1);

        public void processMessage(Foo foo) {
            System.out.println(foo);
            this.latch.countDown();
        }

    }

    public static class Foo {

        private String bar = "bar";

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}