I am using Spring AMQP (RabbitMQ implementation) and I am trying to propagate a single transaction into multiple threads.
As an example, Let's say there are 3 queues, names X,Y,Z and first I'm obtaining a message from queue X using thread-1, and next,that message is given to thread-0, and within thread-0 message is cloned and sent to queue Y via, thread-2 and queue Z via thread-3. Thread-0 wait for the completion of both thread-3 and thread-4, to commit or rollback message. Note that's I'm using 4 threads here.
What I want is basically to handle this 3 operations,(getting message and putting it to two queues) as a single transaction. i.e. if I successfully send the message to queue Y, but fails to send it to Z, then the message sent to Y will be rollbacked and the original message will be rolled back into queue X as well.
Up to now, I have managed to propegate transaction information via threadLocals (mainly TransactionStatus and TransactionSynchronizationManager.resources) and I was able to bind these 3 operations into one transaction.
But my problem is with sending ACK/NACK to original queue X, even though I commit/rollback transaction, it only works for queues Y and Z only. The message obtained from X is always in Unacked state.
I have tries channel.basicAck(), RabbitUtils.commitIfNecessary(), approaches, but no success.
Note that I have enabled channelTransacted as well. Any help is highly appreciated.