6
votes

I've created using the RabbitMQ web-UI a topic exchange TX and bind to the exchange two queues TX.Q1 and TX.Q2, each binded with routing-keys rk1 and rk2 accordingly, and produced few messages to the exchange.

Now I want to create a consumer using Spring Cloud Stream that will take messages from Q1 only. I tried using configuration:

spring.cloud.stream.bindings.input.destination=TX
spring.cloud.stream.bindings.input.group=Q1

and the annotation @StreamListner(Sink.INPUT) for the method that consumes messages.

As result I can see that the consumer has created a queue (or binding) with the same name TX.Q1 but the Routing-Key of the new queue/bind is #.
How can I configure via Spring Cloud Stream a consumer that will consume messages from the predifined queue (only that routed with rk1).

4

4 Answers

5
votes

So for now, the work-around that Garry Russell suggested has solved the issue for me.

I've used @RabbitListener instead of @StreamListenet this way:
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "TX.Q1", durable = "true"), exchange = @Exchange(value = "TX", type = "topic", durable = "true"), key = "rk1").

As a result, the predefined queue TX.Q1 is bind with binding key : rk1 to the exchange TX.

Waiting for updates on the Spring Cloud Steream issue.

2
votes

I think I found the solution using the @StreamListener, not using the workaround. Everything is made in the configuration, not in the code.

The configuration I used is the following (it's in .yml, but you can easly translate it in .properties):

spring:
  cloud:
    stream:
      bindings:
        input:
          binder: <binder_name>
          destination: TX
          group: Q1
      binders:
        <binder_name>:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host>
                port: <port>
                virtual-host: <vhost>
                username: <username>
                password: <password>
      rabbit:
        bindings:
          input:
            consumer:
              binding-routing-key: rk1
              exchange-name: TX
              queue-name-group-only: true
              bind-queue: true
              exchange-durable: true
              exchange-type: topic

Using this approach, you don't have to write a particular code to let the RabbitMQ consumer connect to your cluster, this should solve your case.

Hope this helps.

0
votes

Spring Cloud Stream sets the router key internally for the consumer endpoint to be either the destination name (exchange name) itself or the routing based on the partition header in case of static partitioning.

I think this github issue might be relevant to your case.

0
votes

It is encouraged to use this property under consumer to enable rabbit to consume from existing queue. Please note that the queue name will be picked from the group property only and not from destination.

queueNameGroupOnly: true

Example:

cloud:
stream:
  # rabbit setting: https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
  rabbit:
    bindings:
      input:
        consumer:
          acknowledgeMode: AUTO
          bindingRoutingKey: DECISION_PERSISTENCE_KEY
          declareExchange: false
          bindQueue: false
          queueNameGroupOnly: true
          consumerTagPrefix: dpa-rabbit-consumer
  bindings:
    input:
      binder: rabbit
      group: DECISION_PERSISTENCE_QUEUE
      content-type: application/json