3
votes

I have an app that's using Boot 2.0 with webflux, and has an endpoint returning a Flux of ServerSentEvent. The events are created by leveraging spring-amqp to consume messages off a RabbitMQ queue. My question is: How do I best bridge the MessageListener's configured listener method to a Flux that can be passed up to my controller?

Project Reactor's create section mentions that it "can be very useful to bridge an existing API with the reactive world - such as an asynchronous API based on listeners", but I'm unsure how to hook into the message listener directly since it's wrapped in the DirectMessageListenerContainer and MessageListenerAdapter. Their example from the create section:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( 
      new MyEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }
    });
});

So far, the best option I have is to create a Processor and simply call onNext() each time in the RabbitMQ listener method to manually produce an event.

2

2 Answers

7
votes

I have something like this:

@SpringBootApplication
@RestController
public class AmqpToWebfluxApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(AmqpToWebfluxApplication.class, args);

        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("foo", "event-" + i);
        }

    }

    private TopicProcessor<String> sseFluxProcessor = TopicProcessor.share("sseFromAmqp", Queues.SMALL_BUFFER_SIZE);

    @GetMapping(value = "/sseFromAmqp", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> getSeeFromAmqp() {
        return this.sseFluxProcessor;
    }

    @RabbitListener(id = "fooListener", queues = "foo")
    public void handleAmqpMessages(String message) {
        this.sseFluxProcessor.onNext(message);
    }

}

The TopicProcessor.share() allows to have many concurrent subscribers which we get when we return this TopicProcessor as a Flux to our /sseFromAmqp REST request via WebFlux.

The @RabbitListener just delegates its received messages to that TopicProcessor.

In the main() I have a code to confirm that I can publish to the TopicProcessor even if there is no subscribers.

Tested with two separate curl sessions and published messages to the queue via RabbitMQ Management Plugin.

By the way I use share() because of: https://projectreactor.io/docs/core/release/reference/#_topicprocessor

from multiple upstream Publishers when created in the shared configuration

That' because that @RabbitListener really can be called from different ListenerContainer threads, concurrently.

UPDATE

Also I moved this sample to my Sandbox: https://github.com/artembilan/sendbox/tree/master/amqp-to-webflux

3
votes

Let's suppose you want to have a single RabbitMQ listener that somehow puts messages to one or more Flux(es). Flux.create is indeed a good way how to create such a Flux.

Let's start with Messaging with RabbitMQ Spring guide and try to adapt it.

The original Receiver would have to be modified in order to be able to put received messages to a FluxSink.

@Component
public class Receiver {

    /**
     * Collection of sinks enables more than one subscriber.
     * Have to keep in mind that the FluxSink instance that the emitter works with, is provided per-subscriber.
     */
    private final List<FluxSink<String>> sinks = new ArrayList<>();

    /**
     * Adds a sink to the collection. From now on, new messages will be put to the sink.
     * Method will be called when a new Flux is created by calling Flux.create method.
     */  
    public void addSink(FluxSink<String> sink) {
        sinks.add(sink);
    }

    public void receiveMessage(String message) {
        sinks.forEach(sink -> {
            if (!sink.isCancelled()) {
                sink.next(message);
            } else {
                // If canceled, don't put any new messages to the sink.
                // Sink is canceled when a subscriber cancels the subscription.
                sinks.remove(sink);
            }
        });
    }
}

Now we have a receiver that puts RabbitMQ messages to sink. Then, creating a Flux is rather simple.

@Component
public class FluxFactory {

    private final Receiver receiver;

    public FluxFactory(Receiver receiver) { this.receiver = receiver; }

    public Flux<String> createFlux() {
        return Flux.create(receiver::addSink);
    }
}

Receiver bean is autowired to the factory. Of course, you don't have to create a special factory. This only demonstrates the idea how to use the Receiver to create the Flux.

The rest of the application from Messaging with RabbitMQ guide may stay the same, including the bean instantiation.

@SpringBootApplication
public class Application {
    ...
    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, 
            MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
    ...
}

I used similar design to adapt Twitter streaming API sucessfuly. Though, there may be a nicer way how to do it.