0
votes

I am new to spring-amqp. I am trying to manually acknowledge the messages instead of using auto-ack.

I am seeing that the last message is being unacked in the management console.

image for unacked message in managemnet console. but the queue is empty.

As soon as I stop the server the last message gets acknowledged. How do I handle this and how can I print in logs ,the message id/information which has been unacknowledged..

Here is the code which I have implemented.

RabbitConfig.java:

public class RabbitMQConfig {

final static String queueName = "spring-boot";

@Bean
Queue queue() {
    return new Queue(queueName, true,false,false,null);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange("spring-boot-exchange");
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(queueName);
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                         MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueName);
    container.setMessageListener(listenerAdapter);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return container;
}

@Bean
Consumer receiver() {
    return new Consumer();
}

@Bean
MessageListenerAdapter listenerAdapter(Consumer receiver) {
    return new MessageListenerAdapter(receiver, "receiveMessage");
}

Consumer.java

public class Consumer implements ChannelAwareMessageListener{

@RabbitListener(queues = "spring-boot")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
        throws IOException, InterruptedException {
    Thread.sleep(500);
    channel.basicAck(tag, true);
    System.out.println(tag + "received");
}

@Override
public void onMessage(Message arg0, Channel arg1) throws Exception {
    // TODO Auto-generated method stub

}

Producer endpoints:

@RestController public class HelloController {

private final RabbitTemplate rabbitTemplate;

public HelloController(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;

}

// Call this end point from the postman or the browser then check in the
// rabbitmq server
@GetMapping(path = "/hello")
public String sayHello() throws InterruptedException {
    // Producer operation
    for (int i = 0; i < 100; i++) {
        Thread.sleep(500);
        rabbitTemplate.convertAndSend(RabbitMQConfig.queueName, "Hello World");
    }
    return "hello";
}

@GetMapping(path = "/hellotwo")
public String sayHellotwo() throws InterruptedException {
    // Producer operation
    for (int i = 0; i < 50; i++) {
        Thread.sleep(500);
        rabbitTemplate.convertAndSend(RabbitMQConfig.queueName, "SEcond message");

    }
    return "hellotwo";
}
1

1 Answers

0
votes

You have two listener containers; the container bean and one created by the framework for the @RabbitListener.

I am not entirely sure what's happening without running a test myself, but I suspect the problem is your attempt to call receiveMessage from the simple MessageListenerAdapter.

That adapter is only designed to call a method with one argument (converted from the Message). Also, that adapter doesn't know how to map @Header parameters. I suspect that delivery fails and since you are using MANUAL acks, no more deliveries are attempted to that container because of the unack'd delivery and the default qos (1).

You don't need your container bean; instead configure the message listener container factory to set the ack mode. See the documentation.

If you are new to spring-amqp; why do you think you need manual acks? The default mode (auto) means the container will ack/nack for you (NONE is traditional rabbit auto-ack). It is not common to use manual acks with Spring.