0
votes

How do you consume using Spring Cloud Stream (that is using AMQP) messages sent with MQTT to RabbitMQ? With MQTT on Rabbit, all messages land on an exchange called "amq.topic".

On the consumer side, with Spring Cloud Stream, a queue is created for each destination and an exchange of type "topic" with the name of the destination; the created queue is bound to the exchange.

Now,

  • I cannot manually bind my queue to the "amq.topic", because it is exclusive:

    cannot obtain exclusive access to locked queue '...' in vhost '/'. It could be originally declared on another connection or the exclusive property value does not match that of the original declaration.

  • I cannot listen directly from the exchange "amq.topic", because, well, you have to listen to a queue...

  • I created a "tmp" queue bound to "amq.topic" where messages are delivered, but I cannot use it as destination, because RMQ will create a new queue called "tmp.SOME_CLIENT_ID", bound to an exchange called "tmp", that has nothing to do with my "tmp" queue.

Any idea would be welcome!

1

1 Answers

1
votes

See Using Existing Queues/Exchanges.

By default, the binder will automatically provision a topic exchange with the name being derived from the value of the destination binding property <prefix><destination>. The destination defaults to the binding name, if not provided. When binding a consumer, a queue will automatically be provisioned with the name <prefix><destination>.<group> (if a group binding property is specified), or an anonymous, auto-delete queue when there is no group. The queue will be bound to the exchange with the "match-all" wildcard routing key (#) for a non-partitioned binding or <destination>-<instanceIndex> for a partitioned binding. The prefix is an empty String by default. If an output binding is specified with requiredGroups, a queue/binding will be provisioned for each group.

There are a number of rabbit-specific binding properties that allow you to modify this default behavior.

If you have an existing exchange/queue that you wish to use, you can completely disable automatic provisioning as follows, assuming the exchange is named myExchange and the queue is named myQueue:

spring.cloud.stream.bindings.<binding name>.destination=myExhange

spring.cloud.stream.bindings.<binding name>.group=myQueue

spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false

spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false

spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true

If you want the binder to provision the queue/exchange, but you want to do it using something other than the defaults discussed here, use the following properties. Refer to the property documentation above for more information.

spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey

spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>

spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'