1
votes

I've been recently investigating about Spring Integration and AMQP (RabbitMQ), as I need to communicate two applications (middleware and backend) with async approach, so that the middleware doesn't block when receiving client calls.

I first followed the simpler approach of implementing this in a synchronous, this meaning that I have a gateway interface and an outbound gateway (with requiresReply=true) on the middleware, and then an inbound gateway and a service activator on the backend. This initial approach works well (I've used Spring Integration XML config).

Now I need clarification on the approach to follow to make this work in an async way.

By looking at the RabbitMQ Tutorial 6, it's better to work with a callback queue and a correlationId, and per what I understood, this would be similar to calling Spring RabbitTemplate's convertAndSend() and then receive(), instead of convertSendAndReceive() (which would block until response is received).

I've checked the Spring Integration docs, where I need to replace the gateway interface on the middleware for it to return Future or ListenableFuture.

Async Gateway

Once that's done, I also looked at the documentation for the outbound gateway, where it says that it can work together with the RabbitTemplate to manage the correlationID and replyTo message attributes.

My questions are:

  1. In order to make this work with an async approach, should I keep working with outbound/inbound gateways, instead of outbound/inbound message converters?
  2. In case of following the outbound/inbound message converters approach (which sounds to me similar to what the RabbitMQ tutorial shows), how do I associate the Future on the gateway interface with the result coming back from with inbound channel adapter?
1
I forgot to mention that I've already checked this another entry which points me to the RabbitTemplate docs.Franco Guidoli

1 Answers

0
votes

To be honest you don't provide an original business requirement. It might be a fact that there is really no reason to get deal with this async handsoff, because you have a @Gateway as an entry point which is thread-free and even if it is blocked to wait for the reply it doesn't impact other threads which may perform similar sendAndReceive operation. In most cases it is really just enough to do everything within the same requestor thread and don't loose performance with shifting to the shared ThreadPoolExecutor.

Right, the Future allows you to free a caller a bit to be ready to accept new requests within the same thread.

Since it is a MessagingGateway and you want to have a reply anyway, there is a hook associated with the request - TemporaryReplyChannel header. That's why that <outbound-gateway> works properly: it place its blocking reply to that channel for the gateway's return (or for FutureTask#set()).

I'd say that we can achieve the same TemporaryReplyChannel gain with that your async reply requirement.

  1. You should use inbound/outbound channel adapter pair.
  2. Before send the message to the <int-amqp:outbound-channel-adapter> you should do this <header-channels-to-string> for the <header-enricher>.
  3. The server side maybe the same - <int-amqp:inbound-gateway>
  4. You should use fixed replyQueue as a header for those message to send through the <int-amqp:outbound-channel-adapter>
  5. the <int-amqp:inbound-channel-adapter> should be configured for that fixed replyQueue.
  6. Both <int-amqp:outbound-channel-adapter> on client side and <int-amqp:inbound-gateway> must be configured for the mapped-request-headers="*" to allow to propagate that reply-channel header to the server and vise versa.
  7. The <int-amqp:inbound-channel-adapter> on the client side will just send the reply to the reply-channel as it is for the <int-amqp:outbound-gateway>
  8. You may need to take care about the correlationId manually, since <int-amqp:inbound-gateway> may require that to produce a reply properly.

Well, something like that...

HTH

Feel free to ask more questions. Or correct me if I misunderstood your question.