I have a Spring Cloud Stream application that receives events from RabbitMQ using the Rabbit Binder. My application can be summarized as this:
@Transactional
@StreamListener(MySink.SINK_NAME)
public void processEvents(Flux<Event> events) {
// Transform events and store them in MongoDB using
// spring-boot-data-mongodb-reactive
...
}
The problem is that it doesn't seem that @Transactional
works with Spring Cloud Stream (or at least that's my impression) since if there's an exception when writing to MongoDB the event seems to have already been ack:ed to RabbitMQ and the operation is not retried.
Given that I want to achieve basically the same functionality as when using the @Transactional
around a function with spring-amqp:
- Do I have to manually ACK the messages to RabbitMQ when using Spring Cloud Stream with the Rabbit Binder?
- If so, how can I achieve this?