0
votes

I am learning about Spring-Integration and have a basic understanding about Gateway and Service-Activators. I love the concept of Gateway. Spring Integration generates the proxy for gateway at run-time. This proxy hides all the messaging details from the consumer of the gateway. In addition, the generated proxy might also be co-relating request and reply.

With the objective of learning, I set out to implement request and reply correlation using raw Spring Integration features and not using Gateway. I am able to set the correlation identifier in the request header, but not able to specify correlation identifier while receiving reply for the channel. The following (at the end of the question) is the code snippet for the same. Also how does the correlation stuff works against a message broker (e.g. RabbitMQ)? Does RabbitMQ provides an ability to retrieve a message with a specific header (correlation identifier) in it?

public class RemoteProxyCalculatorService implements CalculatorService
{
    public int Square(int n) 
    {
        UUID uuid = SendRequest(n, "squareRequestChannel");
        int squareOfn = ReceiveReply("squareReplyChannel", uuid);
        return squareOfn;
    }

    private <T> UUID SendRequest(T payload, String requestChannel)
    {
        UUID requestID = UUID.randomUUID();
        Message<T> inputMessage = MessageBuilder.withPayload(payload)
                .setCorrelationId(requestID)
                .build();

        MessageChannel channel = (MessageChannel)context.getBean(requestChannel, MessageChannel.class);
        channel.send(inputMessage);
        return requestID;
    }

    @SuppressWarnings("unchecked")
    private <T> T ReceiveReply(String replyChannel, UUID requestID)
    {
        //How to consume requestID so as to receive only the reply related to the request posted by this thread
        PollableChannel channel = (PollableChannel)context.getBean(replyChannel);
        Message<?> groupMessage = channel.receive();
        return (T)groupMessage.getPayload();
    }

    private ClassPathXmlApplicationContext context;
}

Thanks.

3

3 Answers

1
votes

The simplest way to correlate within an app doesn't even require a correlationId header. Instead you can create a QueueChannel instance (that you don't share) and provide that as s the replyChannel header on the Message you send. Whatever downstream component ultimately responds, it will find that header in the Message.

Regarding RabbitMQ, our outbound-gateway simply applies a similar technique, but using the replyTo property of the AMQP Message.

Hope that helps.

-Mark

1
votes

Problem is with common reply channel. The solution (Mark suggested the similar) will look like this.

public class RemoteProxyCalculatorService
{
  public int Square(int n)
  {
    PollableChannel replyChannel = SendRequest(n, "squareRequestChannel");
    int squareOfn = ReceiveReply(replyChannel);
    return squareOfn;
  }

  private <T> PollableChannel SendRequest(T payload, String requestChannel)
  {
    UUID requestID = UUID.randomUUID();
    QueueChannel replyQueueChannel = new QueueChannel();
    Message<T> inputMessage = MessageBuilder.withPayload(payload)
        .setCorrelationId(requestID)
        .setReplyChannel(replyQueueChannel)
        .build();
    MessageChannel channel = context.getBean(requestChannel, MessageChannel.class);
    channel.send(inputMessage);
    return replyQueueChannel;
  }

  @SuppressWarnings("unchecked")
  private <T> T ReceiveReply(PollableChannel replyChannel)
  {
    Message<?> groupMessage = replyChannel.receive();
    return (T) groupMessage.getPayload();
  }

  private ClassPathXmlApplicationContext context;
}
0
votes

If you want to use common reply channel then I think this is what you are looking for.

public class RemoteProxyCalculatorService
{
  public int Square(int n)
  {
    PollableChannel replyChannel = SendRequest(n, "squareRequestChannel");
    int squareOfn = ReceiveReply(replyChannel);
    return squareOfn;
  }

  private <T> PollableChannel SendRequest(T payload, String requestChannel)
  {
    UUID requestID = UUID.randomUUID();
    Message<T> inputMessage = MessageBuilder.withPayload(payload)
        .setCorrelationId(requestID)
        .setReplyChannel(myMessageHandler.getSubscribedChannel())
        .build();

    // Create a Pollable channel for two things

    // 1. Pollable channel is where this thread should look for reply.
    QueueChannel replyQueueChannel = new QueueChannel();

    // 2. Message Handler will send reply to this Pollable channel once it receives the reply using correlation Id.
    myMessageHandler.add(requestID, replyQueueChannel);

    MessageChannel channel = context.getBean(requestChannel, MessageChannel.class);
    channel.send(inputMessage);

    return replyQueueChannel;
  }

  @SuppressWarnings("unchecked")
  private <T> T ReceiveReply(PollableChannel replyChannel)
  {
    Message<?> groupMessage = replyChannel.receive();
    return (T) groupMessage.getPayload();
  }

  private ClassPathXmlApplicationContext context;

  @Autowired
  private MyMessageHandler myMessageHandler;
}

/**
 * Message Handler
 * 
 */
public class MyMessageHandler implements MessageHandler
{
  private final Map<Object, MessageChannel> idChannelsMap = new TreeMap<>();
  private final Object lock = new Object();
  private final SubscribableChannel subscribedChannel;

  public MyMessageHandler(SubscribableChannel subscribedChannel)
  {
    this.subscribedChannel = subscribedChannel;
  }

  @Override
  public void handleMessage(Message<?> message) throws MessagingException
  {
    synchronized (lock)
    {
      this.idChannelsMap.get(message.getHeaders().getCorrelationId()).send(message);
      this.idChannelsMap.remove(message.getHeaders().getCorrelationId());
    }
  }

  public void add(Object correlationId, MessageChannel messageChannel)
  {
    synchronized (lock)
    {
      this.idChannelsMap.put(correlationId, messageChannel);
    }
  }

  public SubscribableChannel getSubscribedChannel()
  {
    return subscribedChannel;
  }

}