1
votes

I am using Spring AMQP (RabbitMQ implementation) and I am trying to propagate a single transaction into multiple threads.

As an example, Let's say there are 3 queues, names X,Y,Z and first I'm obtaining a message from queue X using thread-1, and next,that message is given to thread-0, and within thread-0 message is cloned and sent to queue Y via, thread-2 and queue Z via thread-3. Thread-0 wait for the completion of both thread-3 and thread-4, to commit or rollback message. Note that's I'm using 4 threads here.

What I want is basically to handle this 3 operations,(getting message and putting it to two queues) as a single transaction. i.e. if I successfully send the message to queue Y, but fails to send it to Z, then the message sent to Y will be rollbacked and the original message will be rolled back into queue X as well.

Up to now, I have managed to propegate transaction information via threadLocals (mainly TransactionStatus and TransactionSynchronizationManager.resources) and I was able to bind these 3 operations into one transaction.

But my problem is with sending ACK/NACK to original queue X, even though I commit/rollback transaction, it only works for queues Y and Z only. The message obtained from X is always in Unacked state.

I have tries channel.basicAck(), RabbitUtils.commitIfNecessary(), approaches, but no success.

Note that I have enabled channelTransacted as well. Any help is highly appreciated.

1
At commit time, thread 1 is active ? Maybe dead, it is wait something ? - Mr_Thorynque
At commit time thread-1 is returned to the pool (all of its thread locals are cleared). Commit is done by thread-0. I have updated the question. - Sajith Dilshan
Maybe set txSize to 1 for your test only or send more messages than txSize during test. - Mr_Thorynque
By default, txSize is set to 1 - Sajith Dilshan

1 Answers

1
votes

Spring transactions are bound to a single thread. You might be able to get it to work using the RabbitMQ client directly - but you would have to share the same channel across all the threads. However, the RabbitMQ documentation strongly advises against doing that:

Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire. ...

In any case, even if you do the work on a single thread, RabbitMQ transactions are rather weak and don't guarantee much; see broker semantics.

AMQP guarantees atomicity only when transactions involve a single queue, i.e. all the publishes inside the tx get routed to a single queue and all acks relate to messages consumed from the same queue. ...