I would like to achieve following scenario in my application:
- If a business error occurs, the message should be send from the incomingQueue to the deadLetter Queue and delayed there for 10 seconds
- The step number 1 should be repeated 3 times
- The message should be published to the parkingLot Queue
I am able (see the code below) to delay the message for a certain amount of time in a deadLetter Queue. And the message is looped infinitely between the incoming Queue and the deadLetter Queue. So far so good.
The main question: How can I intercept the process and manually route the message (as described in the step 3) to the parkingLot Queue for later further analysis?
A secondary question: Can I achieve the same process with only one exchange?
Here is a shortened version of my two classes:
Configuration class
@Configuration
public class MailRabbitMQConfig {
@Bean
TopicExchange incomingExchange() {
TopicExchange incomingExchange = new TopicExchange(incomingExchangeName);
return incomingExchange;
}
@Bean
TopicExchange dlExchange() {
TopicExchange dlExchange = new TopicExchange(deadLetterExchangeName);
return dlExchange;
}
@Bean
Queue incomingQueue() {
return QueueBuilder.durable(incomingQueueName)
.withArgument(
"x-dead-letter-exchange",
dlExchange().getName()
)
.build();
}
@Bean
public Queue parkingLotQueue() {
return new Queue(parkingLotQueueName);
}
@Bean
Binding incomingBinding() {
return BindingBuilder
.bind(incomingQueue())
.to(incomingExchange())
.with("#");
}
@Bean
public Queue dlQueue() {
return QueueBuilder
.durable(deadLetterQueueName)
.withArgument("x-message-ttl", 10000)
.withArgument("x-dead-letter-exchange", incomingExchange()
.getName())
.build();
}
@Bean
Binding dlBinding() {
return BindingBuilder
.bind(dlQueue())
.to(dlExchange())
.with("#");
}
@Bean
public Binding bindParkingLot(
Queue parkingLotQueue,
TopicExchange dlExchange
) {
return BindingBuilder.bind(parkingLotQueue)
.to(dlExchange)
.with(parkingLotRoutingKeyName);
}
}
Consumer class
@Component
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
public Boolean receivedMessage(MailDataExternalTemplate mailDataExternalTemplate) throws Exception {
try {
// business logic here
} catch (Exception e) {
throw new AmqpRejectAndDontRequeueException("Failed to handle a business logic");
}
return Boolean.TRUE;
}
}
I know I could define an additional listener for a deadLetter Queue in a Consumer class like that:
@RabbitListener(queues = "${mail.rabbitmq.queue.deadletter}")
public void receivedMessageFromDlq(Message failedMessage) throws Exception {
// Logic to count x-retries header property value and send a failed message manually
// to the parkingLot Queue
}
However it does not work as expected because this listener is called as soon as the message arrives the head of the deadLetter Queue without to be delayed.
Thank you in advance.
EDIT: I was able with @ArtemBilan and @GaryRussell help to solve the problem. The main solution hints are within their comments in the accepted answer. Thank you guys for the help! Below you will find a new diagram that shows the messaging process and the Configuration and the Consumer classes. The main changes were:
- The definition of the routes between the incoming exchange -> incoming queue and the dead letter exchange -> dead letter queue in the
MailRabbitMQConfig
class. - The loop handling with the manual publishing of the message to the parking lot queue in the
Consumer
class
Configuration class
@Configuration
public class MailRabbitMQConfig {
@Autowired
public MailConfigurationProperties properties;
@Bean
TopicExchange incomingExchange() {
TopicExchange incomingExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getIncoming());
return incomingExchange;
}
@Bean
TopicExchange dlExchange() {
TopicExchange dlExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getDeadletter());
return dlExchange;
}
@Bean
Queue incomingQueue() {
return QueueBuilder.durable(properties.getRabbitMQ().getQueue().getIncoming())
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
dlExchange().getName()
)
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
properties.getRabbitMQ().getRoutingKey().getDeadLetter()
)
.build();
}
@Bean
public Queue parkingLotQueue() {
return new Queue(properties.getRabbitMQ().getQueue().getParkingLot());
}
@Bean
Binding incomingBinding() {
return BindingBuilder
.bind(incomingQueue())
.to(incomingExchange())
.with(properties.getRabbitMQ().getRoutingKey().getIncoming());
}
@Bean
public Queue dlQueue() {
return QueueBuilder
.durable(properties.getRabbitMQ().getQueue().getDeadLetter())
.withArgument(
properties.getRabbitMQ().getMessages().X_MESSAGE_TTL_HEADER,
properties.getRabbitMQ().getMessages().getDelayTime()
)
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
incomingExchange().getName()
)
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
properties.getRabbitMQ().getRoutingKey().getIncoming()
)
.build();
}
@Bean
Binding dlBinding() {
return BindingBuilder
.bind(dlQueue())
.to(dlExchange())
.with(properties.getRabbitMQ().getRoutingKey().getDeadLetter());
}
@Bean
public Binding bindParkingLot(
Queue parkingLotQueue,
TopicExchange dlExchange
) {
return BindingBuilder.bind(parkingLotQueue)
.to(dlExchange)
.with(properties.getRabbitMQ().getRoutingKey().getParkingLot());
}
}
Consumer class
@Component
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
@Autowired
public MailConfigurationProperties properties;
@Autowired
protected EmailClient mailJetEmailClient;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
public Boolean receivedMessage(
@Payload MailDataExternalTemplate mailDataExternalTemplate,
Message amqpMessage
) {
logger.info("Received message");
try {
final EmailTransportWrapper emailTransportWrapper = mailJetEmailClient.convertFrom(mailDataExternalTemplate);
mailJetEmailClient.sendEmailUsing(emailTransportWrapper);
logger.info("Successfully sent an E-Mail");
} catch (Exception e) {
int count = getXDeathCountFromHeader(amqpMessage);
logger.debug("x-death count: " + count);
if (count >= properties.getRabbitMQ().getMessages().getRetryCount()) {
this.rabbitTemplate.send(
properties.getRabbitMQ().getExchange().getDeadletter(),
properties.getRabbitMQ().getRoutingKey().getParkingLot(),
amqpMessage
);
return Boolean.TRUE;
}
throw new AmqpRejectAndDontRequeueException("Failed to send an E-Mail");
}
return Boolean.TRUE;
}
private int getXDeathCountFromHeader(Message message) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
if (headers.get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER) == null) {
return 0;
}
//noinspection unchecked
ArrayList<Map<String, ?>> xDeath = (ArrayList<Map<String, ?>>) headers
.get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER);
Long count = (Long) xDeath.get(0).get("count");
return count.intValue();
}