1
votes


I need to create a RabbitMQ messaging system which sends messages to specific users, where one message is always meant for one user.

Every user should have their own, dynamically created, message queue and a DLQ. When a user rejects a message, it should be moved to his DLQ, where it will wait for 10 seconds and then return back to the regular queue.
Every part is working, except that the messages are not removed from the regular queue when accepted or rejected.

Configuration:

spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.default-requeue-rejected=true
spring.rabbitmq.listener.simple.retry.enabled=false

Messaging service:

@Service
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
public class RabbitService {

    public static final String EXCHANGE_NAME = "my-exchange";
    public static final String QUEUE_NAME_PREFIX = "my-queue.";

    private final RabbitTemplate rabbitTemplate;
    private final AmqpAdmin amqpAdmin;

    @Autowired
    public RabbitService(
            RabbitTemplate rabbitTemplate,
            AmqpAdmin amqpAdmin
    ) {
        this.rabbitTemplate = rabbitTemplate;
        this.amqpAdmin = amqpAdmin;
    }

    /**
     * Initializes exchange with name {@link EXCHANGE_NAME} if it does not exist.
     */
    @PostConstruct
    public void init() {
        Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
        amqpAdmin.declareExchange(exchange);
    }

    /**
     * Sends the {@param message} to user with id {@param userId}.
     */
    public void send(@NotNull String message, long userId) {
        String queueName = QUEUE_NAME_PREFIX + userId;

        declareQueuesIfNecessary(queueName);

        rabbitTemplate.convertAndSend(EXCHANGE_NAME, queueName, message);
    }

    /**
     * Declares a new {@link Queue} with the specified {@param queueId} and a DLQ if they do not exist.
     *
     * @param queueId the queue identifier
     */
    private void declareQueuesIfNecessary(@NotNull String queueName) {
        String resendQueueName = "resend." + queueName;

        Map<String, Object> args;

        args = new HashMap<>();

        // Args for the resend queue. It should have a TTL of 10 seconds, after which it will be moved to the regular queue.
        args.put("x-message-ttl", 10000L);
        args.put("x-dead-letter-exchange", EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key", queueName);
        amqpAdmin.declareQueue(new Queue(resendQueueName, true, false, true, args));
        amqpAdmin.declareBinding(new Binding(
                resendQueueName,
                Binding.DestinationType.QUEUE,
                EXCHANGE_NAME,
                resendQueueName,
                null
        ));

        // Args for the regular queue. When messages are rejected, they should move to the resend queue.
        args = new HashMap<>();
        args.put("x-dead-letter-exchange", EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key", resendQueueName);
        amqpAdmin.declareQueue(new Queue(queueName, true, false, true, args));
        amqpAdmin.declareBinding(new Binding(
                queueName,
                Binding.DestinationType.QUEUE,
                EXCHANGE_NAME,
                queueName,
                null
        ));
    }
}

Consumer:

@Component
public class SimpleRabbitConsumer {

    /**
     * Consumes messages for all users and prints them if they are not blank, otherwise throws an exception.
     *@implNote - in this example we don't need the user id
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = RabbitService.EXCHANGE_NAME, type = "topic"),
            key = RabbitService.QUEUE_NAME_PREFIX + "*"
    ))
    public void consume(@NotNull String message) {
        // Let's reject the message it it's blank
        if (StringUtils.isBlank(message)) {
            throw new RuntimeException("Message rejected");
        }

        System.out.println("Message '" + message + "' received.");
    }
}

What doesn't work
When the consumer throws the RuntimeException, the message will not be rejected, instead it will stay in the regular queue.
I'm using the RabbitMQ manager plugin to see the information about the queues.
The message will stay there even if it's consumed correctly.

What works
When the RabbitService#send(...) method is called, it correctly created both the regular and dead-letter queues for the specified user, it he does not yet have them.
The message is then correctly received by the SimpleRabbitConsumer#consume(...), which either prints it or throws the exception.
If the message is acknowledged using the RabbitMQ manager, then it is correctly removed from the queue.
When the message is instead rejected (without requeue), then it is correctly sent to the DLQ, where it will stay for 10 seconds before being moved back.

What I've tried
I've changed the RuntimeException to a AmqpRejectAndDontRequeueException, yet it still behaved the same way, both for spring.rabbitmq.listener.simple.acknowledge-mode auto and manual.
While using the manual acknowledgement, I've also tried to change the consumer method to:

    public void consume(
            @NotNull String message,
            @NotNull Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long tag
    ) throws IOException {
        // Let's reject the message it it's blank
        if (StringUtils.isBlank(message)) {
            channel.basicReject(tag, false);
        } else {
            System.out.println("Message '" + message + "' received.");
            channel.basicAck(tag, false);
        }
    }

with no avail.

Additional issue
When a queue has unprocessed items while the consumer is shut down, then the messages will not be sent to the consumer when he is restarted.


I understand that the explanation is quite long, but I've spent a long time browsing the internet for solutions without finding any, so I've came here.
I would greatly welcome any suggestion on how to make it work.

Thank you.


EDIT
While debugging, I've found that when the application starts, a queue Q is automatically created for the exchange with binding "my-queue.*".
A channel is also created, which is connected to the consumer and the queue Q, while the user queues have no consumers. Any message that I send seems to be sent to both the queue Q and the user queue, yet it gets removed only from queue Q.

The log is as follows:

2019-07-31 13:51:54.411  WARN 130172 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.myproject.SimpleRabbitConsumer.consume(java.lang.String)' threw exception
...
Caused by: java.lang.RuntimeException: Message rejected

2019-07-31 13:51:54.456 DEBUG 130172 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Rejecting messages (requeue=false)
2019-07-31 13:51:54.456 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.basicNack([1, true, false])
2019-07-31 13:51:54.457 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for message from consumer.
2019-07-31 13:51:54.457 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Retrieving delivery for Consumer@7f5ea4e8: tags=.......
2019-07-31 13:51:55.460 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for message from consumer.
2019-07-31 13:51:55.460 TRACE 130172 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Retrieving delivery for Consumer@7f5ea4e8: tags=.......

Could it be that when I send a message to user with id 1, the message is sent via the topic to both bindings "my-queue.*" and "my-queue.1", yet the consumer only handles the "my-queue.*" and not the "my-queue.1"?

1
@ArnaudClaudel The issue you've mentioned is about the messages not being added back into the queue, but my problem is that they are not being removed. I've tried to use the specified changes though.<br> When using acknowledgemode=AUTO with defaultRequeueRejected=false, then there is no change regardless if the consumer throws an exception or not. Ie. the message is still stuck in the queue. Same for the AmqpRejectAndDontRequeueException. I'll add a separate comment with the log.warozell

1 Answers

0
votes

When the consumer throws the RuntimeException , messages aren't being removed from the queue and stuck in a Exception loop, I assume that is error you are facing, I faced the same Issue while ago and simply Exception Handling Fixed My problem, Have you try that?