I am using springs SimpleMessageListenerContainer to consume messages from a RabbitMQ queue. Everything works fine, but when a invalid message is sent to the queue (e.g. invalid json) the listener just aborts, is shutting down the worker and doesn't accept any further messages.
Is it possible to configure it in a way that it discards the broken message and keeps listening to further messages?
I'm using sprint-rabbit-1.6.1.RELEASE.jar
My configuration looks like the following:
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
MessageConverter messageConverter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("my.queue");
container.setMessageListener(listenerAdapter);
container.setMessageConverter(messageConverter);
return container;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
MessageListenerAdapter listenerAdapter(Worker worker) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage");
messageListenerAdapter.setMessageConverter(new Jackson2JsonMessageConverter());
return messageListenerAdapter;
}
The declaration of my listener method:
public void processMessage(Map<String, String> message) {
When I send a message like '"routeId":"7"}'
(broken json), then I get the Exception:
2016-09-02 08:10:35.821 WARN 35841 --- [ container-29] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) [spring-rabbit-1.6.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101]
at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 12 common frames omitted
2016-09-02 08:10:35.828 ERROR 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal exception during processing
org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException: Invalid listener
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1351) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 1 common frames omitted
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101]
at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 12 common frames omitted
2016-09-02 08:10:35.833 ERROR 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2016-09-02 08:10:35.833 INFO 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2016-09-02 08:10:35.833 INFO 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
The fatal Exception in SimpleMessageListenerContainer is thrown here:
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
So it seems it is supposed to shut down if the container is configured with a non-existent method. But in case of a broken message, it's trying to call the method with a wrong parameter type, which also causes a NoSuchMethodException. This means that any producer can kill my consumer with a broken message.
Thanks for any suggestions!
But in case of a broken message, it's trying to call the method with a wrong parameter type, which also causes a NoSuchMethodException.
. You need to show the complete stack trace, not an edited one and what version are you using? Normally, bad JSON causesMessageConversionException
s which are treated specially and such messages are rejected. – Gary Russell