0
votes

I have an app that is gonna listen to multiple queues, which are declared on different vhost. I used a SimpleRoutingConnectionFactory to store a connectionFactoryMap, and I hope to set up my listener with @RabbitListener.

According to Spring AMQP doc:

Also starting with version 1.4, you can configure a routing connection factory in a SimpleMessageListenerContainer. In that case, the list of queue names is used as the lookup key. For example, if you configure the container with setQueueNames("foo, bar"), the lookup key will be "[foo,bar]" (no spaces).

I used @RabbitListener(queues = "some-key"). Unfortunately, spring complained "lookup key [null]". See below.

18:52:44.528 WARN --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null] at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:119) at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:97) at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:90) at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140) at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76) at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:472) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1306) at java.lang.Thread.run(Thread.java:745)

  1. Did I do something wrong? If queues attribute is used as lookup key (for connection factory lookup), what am I supposed to use to specify which queue I'd like to listen to?

  2. Ultimately, I hope to do programmatic/dynamic listener setup. If I use "Programmatic Endpoint Registration", am I supposed to drop "Annotation-driven listener endpoints"? I love "Annotation-driven listener endpoints", because a listener could have multiple message handles with different incoming data type as argument, which is very clean and tidy. If I use Programmatic Endpoint Registration, I would have to parse the Message input variable, and call my a particular custom message handler based on the message type/content.

EDIT: Hi Gary, I modified your code #2 a little bit, so that it uses Jackson2JsonMessageConverter to serialize class objects (in RabbitTemplate bean), and use it to un-serialize them back to objects (in inboundAdapter). I also removed @RabbitListener because all listeners would be added at runtime in my case. Now the fooBean can receive integer, string and TestData message without any problem! The only issue left behind is that the program constantly report warning:

"[erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it

java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]". For the full stacktrace, please see the bottom.

Did I miss anything?

@SpringBootApplication
public class App2 implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(App2.class, args);
    }

    @Autowired
    private IntegrationFlowContext flowContext;

    @Autowired
    private ConnectionFactory routingCf;

    @Autowired
    private RabbitTemplate template;

    @Override
    public void run(String... args) throws Exception {
        // dynamically add a listener for queue qux
        IntegrationFlow flow = IntegrationFlows.from(Amqp.inboundAdapter(this.routingCf, "qux").messageConverter(new Jackson2JsonMessageConverter()))
                .handle(fooBean())
                .get();
        this.flowContext.registration(flow).register();

        // now test it
        SimpleResourceHolder.bind(this.routingCf, "[qux]");
        this.template.convertAndSend("qux", 42);
        this.template.convertAndSend("qux", "fizbuz");
        this.template.convertAndSend("qux", new TestData(1, "test"));
        SimpleResourceHolder.unbind(this.routingCf);
    }

    @Bean
    RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(routingCf);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    @Primary
    public ConnectionFactory routingCf() {
        SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
        Map<Object, ConnectionFactory> map = new HashMap<>();
        map.put("[foo,bar]", routedCf());
        map.put("[baz]", routedCf());
        map.put("[qux]", routedCf());
        rcf.setTargetConnectionFactories(map);
        return rcf;
    }

    @Bean
    public ConnectionFactory routedCf() {
        return new CachingConnectionFactory("127.0.0.1");
    }

    @Bean
    public Foo fooBean() {
        return new Foo();
    }

    public static class Foo {

        @ServiceActivator
        public void handleInteger(Integer in) {
            System.out.println("int: " + in);
        }

        @ServiceActivator
        public void handleString(String in) {
            System.out.println("str: " + in);
        }

        @ServiceActivator
        public void handleData(TestData data) {
            System.out.println("TestData: " + data);
        }
    }
}

Full stack trace:

2017-03-15 21:43:06.413  INFO 1003 --- [           main] hello.App2                               : Started App2 in 3.003 seconds (JVM running for 3.69)
2017-03-15 21:43:11.415  WARN 1003 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it

java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
    at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:119) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:97) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1430) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1387) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:500) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin$11.onCreate(RabbitAdmin.java:419) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:33) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:571) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:90) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:505) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1382) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_112]
1

1 Answers

0
votes

Please show your configuration - it works fine for me...

@SpringBootApplication
public class So42784471Application {

    public static void main(String[] args) {
        SpringApplication.run(So42784471Application.class, args);
    }

    @Bean
    @Primary
    public ConnectionFactory routing() {
        SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
        Map<Object, ConnectionFactory> map = new HashMap<>();
        map.put("[foo,bar]", routedCf());
        map.put("[baz]", routedCf());
        rcf.setTargetConnectionFactories(map);
        return rcf;
    }

    @Bean
    public ConnectionFactory routedCf() {
        return new CachingConnectionFactory("10.0.0.3");
    }

    @RabbitListener(queues = { "foo" , "bar" })
    public void foobar(String in) {
        System.out.println(in);
    }

    @RabbitListener(queues = "baz")
    public void bazzer(String in) {
        System.out.println(in);
    }

}

Regarding your second question, you could build the endpoint manually but it's quite involved. It's probably easier to use a similar feature in a Spring Integration @ServiceActivator.

I will update this answer with details shortly.

EDIT

And here's the update using Spring Integration techniques to dynamically add a multi-method listener at runtime...

@SpringBootApplication
public class So42784471Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So42784471Application.class, args);
    }

    @Autowired
    private IntegrationFlowContext flowContext;

    @Autowired
    private ConnectionFactory routingCf;

    @Autowired
    private RabbitTemplate template;

    @Override
    public void run(String... args) throws Exception {
        // dynamically add a listener for queue qux
        IntegrationFlow flow = IntegrationFlows.from(Amqp.inboundAdapter(this.routingCf, "qux"))
                .handle(fooBean())
                .get();
        this.flowContext.registration(flow).register();

        // now test it
        SimpleResourceHolder.bind(this.routingCf, "[qux]");
        this.template.convertAndSend("qux", 42);
        this.template.convertAndSend("qux", "fizbuz");
        SimpleResourceHolder.unbind(this.routingCf);
    }


    @Bean
    @Primary
    public ConnectionFactory routingCf() {
        SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
        Map<Object, ConnectionFactory> map = new HashMap<>();
        map.put("[foo,bar]", routedCf());
        map.put("[baz]", routedCf());
        map.put("[qux]", routedCf());
        rcf.setTargetConnectionFactories(map);
        return rcf;
    }

    @Bean
    public ConnectionFactory routedCf() {
        return new CachingConnectionFactory("10.0.0.3");
    }

    @RabbitListener(queues = { "foo" , "bar" })
    public void foobar(String in) {
        System.out.println(in);
    }

    @RabbitListener(queues = "baz")
    public void bazzer(String in) {
        System.out.println(in);
    }

    @Bean
    public Foo fooBean() {
        return new Foo();
    }

    public static class Foo {

        @ServiceActivator
        public void handleInteger(Integer in) {
            System.out.println("int: " + in);
        }

        @ServiceActivator
        public void handleString(String in) {
            System.out.println("str: " + in);
        }

    }

}