1
votes

I am getting "IllegalArgumentException" errors trying to do the following:

  • Receive an AMQP message that forces me to stop a "SimpleMessageListenerContainer".
  • Make an AMQP request using the "direct reply-to" mechanism.
  • Restart the "SimpleMessageListenerContainer"

I have a Project with a SimpleMessageListenerContainer, listening in a queue called “USER-broadcast-queue”:

@Bean("broadcastMessageListenerContainer")
public SimpleMessageListenerContainer broadcastMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(simpleRoutingConnectionFactory());
    container.setQueues(marketDataBroadcastQueue());
    container.setMessageListener(messageListenerAdapter());
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    container.setDeclarationRetries(12);                    
    container.setFailedDeclarationRetryInterval(5000);
    container.setMismatchedQueuesFatal(true);
    container.setPrefetchCount(50);
    container.setAutoStartup(false);

    clientHandler.setBroadcastMessageListenerContainer(container);

    return container;
}

@Bean 
public RabbitTemplate rabbitTemplate() 
{
    RabbitTemplate template = new RabbitTemplate(simpleRoutingConnectionFactory());
    template.setMessageConverter(jsonMessageConverter());
    template.setUseDirectReplyToContainer(true);

    template.setRoutingKey(REQUEST_EXCHANGE_NAME);      
   template.setMandatory(true);
    template.setReplyTimeout(20000);    
    return template;
}

My ClientHandler is a vean defined like this:

@Component
public class ClientHandler{
        public void handleMessage(…) {….}
}



@Bean 
    public MessageListenerAdapter messageListenerAdapter() {
        return new MessageListenerAdapter(clientHandler, jsonMessageConverter());       
    }

Now, when recibe some kind of message, I need to stop the container:

broadcastMessageListenerContainer.stop();

Ask a request to the server from a RabbitGatewaySupport (using direct reply-to):

_response = getRabbitOperations().convertSendAndReceive(requestExchange, routingKeyInquiry, myObject, 
new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
…
return message;
            }
        });

And then start the listener again:

broadcastMessageListenerContainer.initialize();
broadcastMessageListenerContainer.start();

This is working fine the first time, but when I recibe a second message, I get this error when I try to send the request (convertSendAndReceive):

12-09-2019 13:54:13.826|DEBUG|USER|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|1233|[]|broadcastMessageListenerContainer-1|Shutting down Rabbit listener container
12-09-2019 13:54:13.826|TRACE| USER |org.springframework.amqp.rabbit.connection.CachingConnectionFactory|1063|[]|broadcastMessageListenerContainer-1|AMQChannel(amqp:// USER @127.0.0.1:5671/,3) channel.isOpen()
12-09-2019 13:54:13.826|TRACE| USER |org.springframework.amqp.rabbit.connection.CachingConnectionFactory|1063|[]|broadcastMessageListenerContainer-1|AMQChannel(amqp:// USER @127.0.0.1:5671/,3) channel.basicCancel([amq.ctag-6ztkTlFxReqUvLDJOsmmdQ])
12-09-2019 13:54:13.828|INFO | USER |org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|586|[]|broadcastMessageListenerContainer-1|Waiting for workers to finish.
12-09-2019 13:54:13.828|DEBUG| USER |org.springframework.amqp.rabbit.listener.BlockingQueueConsumer|886|[]|pool-7-thread-9|Received cancelOk for tag amq.ctag-6ztkTlFxReqUvLDJOsmmdQ (USER -broadcast-queue); Consumer@5c20aab9: tags=[[amq.ctag-6ztkTlFxReqUvLDJOsmmdQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5671/,3), conn: Proxy@46baf579 Shared Rabbit Connection: SimpleConnection@48b0e701 [delegate=amqp:// USER @127.0.0.1:5671/, localPort= 55757], acknowledgeMode=AUTO local queue size=0
12-09-2019 13:54:18.829|INFO | USER |org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|592|[]|broadcastMessageListenerContainer-1|Workers not finished.
12-09-2019 13:54:18.829|WARN | USER |org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|596|[]|broadcastMessageListenerContainer-1|Closing channel for unresponsive consumer: Consumer@5c20aab9: tags=[[amq.ctag-6ztkTlFxReqUvLDJOsmmdQ]], channel=Cached Rabbit Channel: AMQChannel(amqp:// [email protected]:5671/,3), conn: Proxy@46baf579 Shared Rabbit Connection: SimpleConnection@48b0e701 [delegate=amqp:// USER @127.0.0.1:5671/, localPort= 55757], acknowledgeMode=AUTO local queue size=0
12-09-2019 13:54:18.829|DEBUG| USER |org.springframework.amqp.rabbit.listener.BlockingQueueConsumer|735|[]|broadcastMessageListenerContainer-1|Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp:// USER @127.0.0.1:5671/,3), conn: Proxy@46baf579 Shared Rabbit Connection: SimpleConnection@48b0e701 [delegate=amqp:// [email protected]:5671/, localPort= 55757]
12-09-2019 13:54:18.829|TRACE| USER |org.springframework.amqp.rabbit.connection.CachingConnectionFactory|1063|[]|broadcastMessageListenerContainer-1|AMQChannel(amqp:// USER @127.0.0.1:5671/,3) channel.close()
12-09-2019 13:54:18.830|DEBUG| USER |org.springframework.amqp.rabbit.connection.CachingConnectionFactory|1276|[]|broadcastMessageListenerContainer-1|Closing cached Channel: AMQChannel(amqp:// USER @127.0.0.1:5671/,3)

    java.lang.IllegalArgumentException: Already value [[USER-broadcast-queue]] for key [org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory@a50b09c] bound to thread [broadcastMessageListenerContainer-1]
    at org.springframework.util.Assert.isNull(Assert.java:176) ~[MyLib-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.connection.SimpleResourceHolder.bind(SimpleResourceHolder.java:125) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.doConsumeFromQueue(DirectMessageListenerContainer.java:659) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.adjustConsumers(DirectMessageListenerContainer.java:313) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.setConsumersPerQueue(DirectMessageListenerContainer.java:161) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.getChannelHolder(DirectReplyToMessageListenerContainer.java:190) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithDirect(RabbitTemplate.java:1896) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:1762) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceiveRaw(RabbitTemplate.java:1731) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1600) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1591) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at es.siom.trading.amqp.gateway.RabbitServiceGateway.sendAndReceiveInquiry(RabbitServiceGateway.java:123) ~[Salida/:?]
    at es.siom.trading.amqp.ui.UIController.sendAndReceiveInquiry(UIController.java:67) ~[Salida/:?]
    at es.siom.trading.utils.ModuloTradingUtils.consultaContratosMercadoPublic(ModuloTradingUtils.java:627) ~[Salida/:?]
    at es.siom.trading.utils.ModuloTradingUtils.leeMercado(ModuloTradingUtils.java:133) ~[Salida/:?]
    at es.siom.trading.amqp.beans.DatosGeneralesLTS.actualizaOfertas(DatosGeneralesLTS.java:602) ~[Salida/:?]
    at es.siom.trading.amqp.handler.ClientHandler.handleMessage(ClientHandler.java:89) ~[Salida/:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_91]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_91]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_91]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_91]
    at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:280) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:363) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:292) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]

Any ideas, any help? Thank you.

1

1 Answers

1
votes

It's a bug.

It's an artifact of the fact that the reply-to container is initialized lazily; we shouldn't be binding its connection factory to the calling thread.

Please open a GitHub issue.

As a work around, you can delegate the send to another thread.

By the way, stopping the container on the listener thread will pause for the shutDownTimeout before actually stopping the active consumer(s).

EDIT

Issue.