1
votes

I have a Spring Cloud Stream application that receives events from RabbitMQ using the Rabbit Binder. My application can be summarized as this:

@Transactional
@StreamListener(MySink.SINK_NAME)
public void processEvents(Flux<Event> events) {
       // Transform events and store them in MongoDB using 
       // spring-boot-data-mongodb-reactive
       ...
}

The problem is that it doesn't seem that @Transactional works with Spring Cloud Stream (or at least that's my impression) since if there's an exception when writing to MongoDB the event seems to have already been ack:ed to RabbitMQ and the operation is not retried.

Given that I want to achieve basically the same functionality as when using the @Transactional around a function with spring-amqp:

  1. Do I have to manually ACK the messages to RabbitMQ when using Spring Cloud Stream with the Rabbit Binder?
  2. If so, how can I achieve this?
1

1 Answers

1
votes

There are several issues here.

  1. Transactions are not required for acknowledging messages
  2. Reactor-based @StreamListener methods are invoked exactly once, just to set up the Flux so @Transactional on that method is meaningless - messages then flow through the flux so anything pertaining to individual messages has to be done within the context of the flux.
  3. Spring Transactions are bound to the thread - Reactor is non-blocking; the message will be acked at the first handoff.

Yes, you would need to use manual acks; presumably on the result of the mongodb store operation. You would probably need to use Flux<Message<Event>> so you would have access to the channel and delivery tag headers.