2
votes

I have a Producer producing messages in a RabbitMQ queue by using a direct exchange.

queue name: TEMP_QUEUE, exchange name: TEMP_DIRECT_EXCHANGE

Producing to this queue is easy since on my producer application I use Spring AMQP which I am familiar with.

On my Consumer application, I need to use Spring cloud stream version 3.0+.

I want to avoid using legacy annotations like @EnableBinding, @StreamListener because they are about to be depracated.

Legacy code for my application would look like that :

@EnableBinding(Bindings.class)
public class TempConsumer {

    @StreamListener(target = "TEMP_QUEUE")
    public void consumeFromTempQueue(MyObject object) {
        // do stuff with the object
    }
}

public interface Bindings {
    @Input("TEMP_QUEUE")
    SubscribableChannel myInputBinding();
}

From their docs I have found out I can do something like that

@Bean
public Consumer<MyObject> consumeFromTempQueue() {
    return obj -> {
        // do stuff with the object
    };
}

It is not clear to me how do I specify that this bean will consume from TEMP_QUEUE? Also what if I want to consume from multiple queues?

2

2 Answers

2
votes

See Consuming from Existing Queues/Exchanges.

You can consume from multiple queues with

spring.cloud.stream.bindings.consumeFromTempQueue-in-0.destination=q1,q2,q3
spring.cloud.stream.bindings.consumer.multiplex=true

Without multiplex you'll get 3 bindings; with multiplex, you'll get 1 listener container listening to multiple queues.

1
votes

You need to use the application.yml to bind your bean.

spring.cloud.stream:
  function.definition: consumeFromTempQueue

You can use this configuration to configure source, process and sink as well. In your case you are just using a source.

You can read this post for more information.