0
votes

I have an application which receives a message from one queue, processes it and sends it to another queue. When it's receiving a lot of messages (20 thousand or more), spring shows me this message when it tries to send the message to another queue:

connection error; protocol method: #method<connection.close>(reply-code=504 reply-text=CHANNEL_ERROR - second 'channel.open' seen class-id=20 method-id=10)

So I raised the channel cache size and created two CachingConnectionFactory one for consumer and another for the producer, this configurations I followed a note from spring doc:

When the application is configured with a single CachingConnectionFactory, as it is by default with Spring Boot auto-configuration, the application will stop working when the connection is blocked by the Broker. And when it is blocked by the Broker, any its clients stop to work. If we have producers and consumers in the same application, we may end up with a deadlock when producers are blocking the connection because there are no resources on the Broker anymore and consumers can’t free them because the connection is blocked. To mitigate the problem, there is just enough to have one more separate CachingConnectionFactory instance with the same options - one for producers and one for consumers. The separate CachingConnectionFactory isn’t recommended for transactional producers, since they should reuse a Channel associated with the consumer transactions.

Following this recommendations the error message disappeared, but now the application suddenly stops, it's not sending or receiving new messages and all queues are idle. It's kind strange because it has a low concurrency number on listener. What am I missing?

Configuration:

Spring Boot: 2.0.8.RELEASE

Spring AMQP: 2.0.11.RELEASE

RabbitMQ: 3.8.8

spring:
  rabbitmq:
    listener:
      simple:
        default-requeue-rejected: false
        concurrency: 5
        max-concurrency: 8
    cache:
      channel:
        size: 1000

  @Bean
    public ConnectionFactory consumerConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(properties.getHost());
        connectionFactory.setPort(properties.getPort());
        connectionFactory.setUsername(properties.getUsername());
        connectionFactory.setPassword(properties.getPassword());
        connectionFactory.setChannelCacheSize(properties.getCache().getChannel().getSize());
        connectionFactory.setConnectionNameStrategy(cns());

        return connectionFactory;
    }

    @Bean
    public ConnectionFactory producerConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(properties.getHost());
        connectionFactory.setPort(properties.getPort());
        connectionFactory.setUsername(properties.getUsername());
        connectionFactory.setPassword(properties.getPassword());
        connectionFactory.setChannelCacheSize(properties.getCache().getChannel().getSize());
        connectionFactory.setConnectionNameStrategy(cns());

        return connectionFactory;
    }

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("consumerConnectionFactory") ConnectionFactory consumerConnectionFactory,
                                                                           SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                           RabbitProperties properties) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setErrorHandler(errorHandler());
    factory.setConcurrentConsumers(properties.getListener().getSimple().getConcurrency());
    factory.setMaxConcurrentConsumers(properties.getListener().getSimple().getMaxConcurrency());
    configurer.configure(factory, consumerConnectionFactory);
    return factory;
}

@Bean
@Primary
public RabbitAdmin producerRabbitAdmin() {
    return new RabbitAdmin(producerConnectionFactory());
}

@Bean
public RabbitAdmin consumerRabbitAdmin() {
    return new RabbitAdmin(consumerConnectionFactory());
}

@Bean
@Primary
public RabbitTemplate producerRabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(producerConnectionFactory());
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    return rabbitTemplate;
}

@Bean
public RabbitTemplate consumerRabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(consumerConnectionFactory());
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    return rabbitTemplate;
}
1
It's hard to tell what's wrong; please provide a complete small example that exhibits this behavior. - Gary Russell
Hi @GaryRussell thanks for answering, the problem was due to application memory limit. - rfel

1 Answers

0
votes

After analize, the problem was due to Java Memory Heap limit. Besides, I updated my configuration, removed ConnectionFactory beans, and set a publisher factory to RabbitTemplate

So I ended with this:

@Bean
@Primary
public RabbitTemplate producerRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    rabbitTemplate.setUsePublisherConnection(true);
    return rabbitTemplate;
}

@Bean
public RabbitTemplate consumerRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    return rabbitTemplate;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                           SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                           RabbitProperties properties) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setErrorHandler(errorHandler());
    factory.setConcurrentConsumers(properties.getListener().getSimple().getConcurrency());
    factory.setMaxConcurrentConsumers(properties.getListener().getSimple().getMaxConcurrency());
    configurer.configure(factory, connectionFactory);
    return factory;
}

With this configuration memory consume was reduced and I was able to raise consumer concurrey numbers:

spring:
  rabbitmq:
    listener:
      simple:
        default-requeue-rejected: false
        concurrency: 10
        max-concurrency: 15
    cache:
      channel:
        size: 1000

I'm looking now for the right cache channel size and to raise even more concurrency numbers.