2
votes

I am a newer to spring amqp. When config spring amqp publisher Confirms and returns , met problems .

amqp config :

SimpleMessageListenerContainer container(CachingConnectionFactory connectionFactory, @Qualifier("topicListenerAdapter")MessageListenerAdapter listenerAdapter) {

    connectionFactory.setChannelCacheSize(5);
    connectionFactory.setPublisherConfirms(true);
    connectionFactory.setPublisherReturns(true);
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("request.queue","reply.queue");
    container.setMessageConverter(json2MessageConverter());
    container.setReceiveTimeout(3000);
    container.setMessageListener(listenerAdapter);

    return container;
}

send message:

rabbitTemplate.convertAndSend("spring-boots5", message);
        rabbitTemplate.setConfirmCallback(new ConfirmCallback(){

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                // TODO Auto-generated method stub

                System.out.println("confirm correlationData is : "+correlationData+"ack is : "+
                ack);
            }

        });
        rabbitTemplate.setMandatory(true);

When running this application , amqp message received :

Body:'This is my first message'MessageProperties [headers={bar=baz}, timestamp=null, messageId=123456, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=request.queue, deliveryTag=1, messageCount=0]) >

met en error :

22 20:48:08.661[0;39m [31mERROR[0;39m [35m37792[0;39m [2m---[0;39m [2m[ 127.0.0.1:5672][0;39m [36mo.s.a.r.s.PublisherCallbackChannelImpl  [0;39m [2m:[0;39m No listener for seq:1

And no expecting string on console :

"confirm correlationData is : "+correlationData+"ack is : "+ ack

And not know how to config Reply Messaging (i use java config)

1

1 Answers

4
votes

You need to set the ConfirmCallback (and ReturnCallback) before sending the message.

You also need to supply some correlation data on the send so you can determine which outbound message the confirm is for.

See this test case...

@Test
public void testPublisherConfirmWithSendAndReceive() throws Exception {
    final CountDownLatch latch = new CountDownLatch(1);
    final AtomicReference<CorrelationData> confirmCD = new AtomicReference<CorrelationData>();
    templateWithConfirmsEnabled.setConfirmCallback(new ConfirmCallback() {

        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            confirmCD.set(correlationData);
            latch.countDown();
        }
    });
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactoryWithConfirmsEnabled);
    container.setQueueNames(ROUTE);
    container.setMessageListener(new MessageListenerAdapter(new Object() {

        @SuppressWarnings("unused")
        public String handleMessage(String in) {
            return in.toUpperCase();
        }
    }));
    container.start();
    CorrelationData correlationData = new CorrelationData("abc");
    String result = (String) this.templateWithConfirmsEnabled.convertSendAndReceive(ROUTE, (Object) "message", correlationData);
    container.stop();
    assertEquals("MESSAGE", result);
    assertTrue(latch.await(10, TimeUnit.SECONDS));
    assertEquals(correlationData, confirmCD.get());
}