1
votes

I'm using spring-amqp:2.1.6.RELEASE

I have a RabbitTemplate with a PublisherReturn callback.

  • If I send a message to a routingKey which has no queues bound to it, then the return callback is called correctly. When this happens I want to send the message to an alternative routingKey. However, if I use the RabbitTemplate in the ReturnCallback it just hangs up. I don't see anything saying the message can/can't be sent, the RabbitTemplate doesn't return control to my ReturnCallback and I don't see any PublisherConfirm either.
  • If I create a new RabbitTemplate (with the same CachingConnectionFactory) then it still behaves the same way. My call just hangs up.
  • If I send a message to a routingKey which does have a queue bound to it, then the message correctly arrives at the queue. The ReturnCallback is not called in this scenario.

After some investigation, I've come to the conclusion that the rabbitTemplate and/or connection is blocked until the original message is completely processed.

If I create a second CachingConnectionFactory and RabbitTemplate, and use these in the PublisherReturn callback, then it seems to work fine.

So, here's the question: What is this the best way to send a message in a PublisherReturn callback using spring-amqp?

I have searched, but can't find anything that explains how you should do this.

Here are simplified details of what I have:

@Configuration
public class MyConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setPublisherReturns(true);
        // ... other settings left out for brevity
        return connectionFactory;
    }

    @Bean
    @Qualifier("rabbitTemplate")
    public RabbitTemplate rabbitTemplate(ReturnCallbackForAlternative returnCallbackForAlternative) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(returnCallbackForAlternative);
        // ... other settings left out for brevity
        return rabbitTemplate;
    }

    @Bean
    @Qualifier("connectionFactoryForUndeliverable")
    public ConnectionFactory connectionFactoryForUndeliverable() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        // ... other settings left out for brevity
        return connectionFactory;
    }

    @Bean
    @Qualifier("rabbitTemplateForUndeliverable")
    public RabbitTemplate rabbitTemplateForUndeliverable() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactoryForUndeliverable());
        // ... other settings left out for brevity
        return rabbitTemplate;
    }

}

Then to send the message I'm using

    @Autowired
    @Qualifier("rabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    public void send(Message message) {
        rabbitTemplate.convertAndSend(
            "exchange-name",
            "primary-key",
            message);
    }

And the code in the ReturnCallback is

@Component
public class ReturnCallbackForAlternative implements RabbitTemplate.ReturnCallback {

    @Autowired
    @Qualifier("rabbitTemplateForUndeliverable")
    private RabbitTemplate rabbitTemplate;

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        rabbitTemplate.convertAndSend(
            "exchange-name",
            "alternative-key",
            message);
    }

}

EDIT

Simplified example to reproduce the problem. To run it:

  1. Have RabbitMq running
  2. Have an exchange called foo bound to a queue called foo
  3. Run as spring boot app

You'll see the following output:

in returnCallback before message send

but you won't see:

in returnCallback after message send

If you comment out the connectionFactory.setPublisherConfirms(true); it runs OK.

@SpringBootApplication
public class HangingApplication {

    public static void main(String[] args) {
      SpringApplication.run(HangingApplication.class, args);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
      CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
      connectionFactory.setPublisherReturns(true);
      connectionFactory.setPublisherConfirms(true);
      return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      rabbitTemplate.setExchange("foo");
      rabbitTemplate.setMandatory(true);

      rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        System.out.println("Confirm callback for main template. Ack=" + ack);
      });

      rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        System.out.println("in returnCallback before message send");
        rabbitTemplate.send("foo", message);
        System.out.println("in returnCallback after message send");
      });

      return rabbitTemplate;
    }

    @Bean
    public ApplicationRunner runner(@Qualifier("rabbitTemplate") RabbitTemplate template) {

      return args -> {
        template.convertAndSend("BADKEY", "foo payload");
      };
    }

    @RabbitListener(queues = "foo")
    public void listen(String in) {
      System.out.println("Message received on undeliverable queue : " + in);
    }

}

Here's the build.gradle I used:

plugins {
    id 'org.springframework.boot' version '2.1.5.RELEASE'
    id 'java'
}

apply plugin: 'io.spring.dependency-management'

group 'pcoates'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.11

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.springframework.boot:spring-boot-starter-amqp'
}
1

1 Answers

3
votes

It causes some kind of deadlock down in the amqp-client code. The simplest solution is to do the send on a separate thread - use a TaskExecutor within the callback...

exec.execute(() -> template.send(...));

You can use the same template/connection factory, but the send must run on a different thread.

I thought we had recently changed the framework to always call the return callback on a different thread (after the last person reported this), but it looks like it fell through the cracks.

I opened an issue this time.

EDIT

Are you sure you're using 2.1.6?

We fixed this problem in 2.1.0 by preventing the send from attempting to use the same channel that the return arrived on. This works fine for me...

@SpringBootApplication
public class So57234770Application {

    public static void main(String[] args) {
        SpringApplication.run(So57234770Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            template.send("foo", message);
        });
        return args -> {
            template.convertAndSend("BADKEY", "foo");
        };
    }

    @RabbitListener(queues = "foo")
    public void listen(String in) {
        System.out.println(in);
    }

}

If you can provide a sample app that exhibits this behavior, I will take a look to see what's going on.