1
votes

We're using spring-amqp 1.5.2, with RabbitMQ version 3.5.3. All queues work fine and we have consumers listening on them with no issues, except one consumer which keeps on dropping connections mysteriously. spring-amqp auto recovers, but after a few hours the consumers are disconnected and never come back up.

The queue is declared as

    @Bean()
public Queue analyzeTransactionsQueue(){
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000);
    return new Queue("analyze.txns", true, false, false, args);
}

Other queues are declared in a similar fashion, and have no issues.

The consumer (listener) is declared as

    @Bean
public SimpleRabbitListenerContainerFactory analyzeTransactionListenerContainerFactory(ConnectionFactory connectionFactory, AsyncTaskExecutor asyncTaskExecutor) {
connectionFactory.getVirtualHost());
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(2);
    factory.setMaxConcurrentConsumers(4);
    factory.setTaskExecutor(asyncTaskExecutor);
    ConsumerTagStrategy consumerTagStrategy = new ConsumerTagStrategy() {
        @Override
        public String createConsumerTag(String queue) {
            return queue;
        }
    };
    factory.setConsumerTagStrategy(consumerTagStrategy);
    return factory;
}

Again, other consumers having no issues are declared in a similar fashion.

The code after the message is received has no exceptions. Even after turning on DEBUG logging for SimpleMessageListenerContainer, there are no errors in the logs.

LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Cancelling Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,47), acknowledgeMode=AUTO local queue size=0; 
LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Idle consumer terminating: Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,47), acknowledgeMode=AUTO local queue size=0; 

Any ideas on why this would be happening. Have tried DEBUG logging but to no avail.

2
Well. There should not be any surprises. Share, please, logs when you see the dropped connection. Maybe you have some policy on the Broker to drop queue after some expires? rabbitmq.com/ttl.htmlArtem Bilan
I work with @nsdiv and I can say that we do set Per-Queue Message TTL (as shown in the code snippet). However, the symptom we're seeing is that the consumers just disconnect from the queue even though messages are being sent to the queue. Since there are no connected consumers, the TTL kicks in and marks messages as dead after the specified TTL (expected behavior). What's mysterious is the consumers dropping amidst normal processing. All other queues' consumers do just fine.shrisha
@ArtemBilan no, there is no policy on the Broker to drop queue. In fact, the queue still exists, only the consumers go away. Unfortunately there isn't much logging available, even in debug mode (I put SimpleMessageListenerContainer and AbstractMessageListenerContainer in debug).nsdiv
Looks like we agreed with you about the reason and solution: stackoverflow.com/questions/35584700/…. I'd close the question to avoid extra noise.Artem Bilan
While the logging problem is solved, we still haven't figured out the issue of the mysterious dropped consumers. We're going to turn on full TRACE logging tonight to see if we get more information. Stay tuned.nsdiv

2 Answers

0
votes

one thing I have observed is that consumer would disconnect if there's an exception during parsing and it doesn't always log the problem, depending on your logging config...

since then, I always wrap the handleDelivery method into a try catch, to get better logging and no connection drop :

consumer = new DefaultConsumer(channel) {

        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {

            log.info("processing message - content : " + new String(body, "UTF-8"));

           try {
                MyEvent myEvent = objectMapper.readValue(new String(body, "UTF-8"), MyEvent.class);
                processMyEvent(myEvent);

            } catch (Exception exp) {
                log.error("couldn't process "+MyEvent.class+" message : ", exp);
            }
        }
    }; 
0
votes

Looking at the way you have configured things, it is pretty obvious that you have enabled dynamic scaling of consumers.

factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);

There was a threading issue that I submitted a fix for which caused number of consumers to drop to zero. This was happening while consumers were scaling down.

By the looks of it, you have been a victim of that problem. The fix has been back-ported I believe and can be seen here

Try using the latest version and see whether you get the same problem.