0
votes

I'd like to know what is the best way to handle messages with unknown/invalid routing key values inside an exchange? In my case, I'm sending all my messages inside the same exchange and based on a routing key, messages are routed to the corresponding queue. Here's my configuration (I'm using Spring Cloud Stream) :

spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type

spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2

spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR

Now what I'm asking is what happens if I send a message with payload.type='ANY'? Obviously, this message won't be retrieved by any consumer and will remain inside the exchange, but what is the best way to keep track of these "unknown" messages? Can I use a DLQ for this?

Thanks!

2

2 Answers

2
votes

You could use mandatory property for your publishers.

When a published message cannot be routed to any queue, 
and the publisher set the mandatory message property to true, the 
message will be returned to it.
The publisher must have a returned message handler set up in order to 
handle the return (e.g. by logging an error or retrying with a different exchange)
1
votes

will remain inside the exchange,

No; exchanges don't "hold" messages, they are simply routers.

Unroutable messages are discarded by default.

You can configure the binding to return unroutable messages.

See Error Channels.

Returns are asynchronous.

In the upcoming 3.1 release, you can wait on a future to determine whether the message was sent successfully or not. See Publisher Confirms.

If the message is unroutable, the correlation data's returnedMessage property is set.

The framework uses the mandatory feature mentioned in another answer.

EDIT

Here is an example:

spring.rabbitmq.publisher-returns: true

spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']

spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
@SpringBootApplication
@EnableBinding(Source.class)
public class So65134452Application {

    public static void main(String[] args) {
        SpringApplication.run(So65134452Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> {
            output.send(MessageBuilder.withPayload("foo")
                    .setHeader("rk", "invalid")
                    .build());
        };
    }

    @Autowired
    RabbitTemplate template;

    @Bean
    public Queue unroutable() {
        return new Queue("unroutable.messages");
    }

    @ServiceActivator(inputChannel = "errorChannel")
    public void error(Message<?> error) {
        if (error.getPayload() instanceof ReturnedAmqpMessageException) {
            this.template.send(unroutable().getName(),
                    ((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
        }
    }

}