0
votes

Problem statement:

Spring amqp-outbound gateway to produce reply from a different thread (Like jms-outbound gateway, having different queue, correlate the request/response using correlation key).

Unable to correlate the message with this example.

Spring integration

    <int:gateway id="outboundGateway" service-interface="com.amqp.outbound.gateway.OutboundGateway"     
                        default-reply-channel="defaultReplyChannel" >
        <int:method name="process"   request-channel="inboundRequestChannel"/>
    </int:gateway>

    <int:channel id="defaultReplyChannel"/>
    <int:channel id="inboundRequestChannel"/>
    <int:channel id="enrichedInboundRequestChannel"/>
    <int:channel id="processAuthRequestChannel"/>
    <int:channel id="postProcessorChannel"/>

    <int:chain input-channel="inboundRequestChannel" output-channel="enrichedInboundRequestChannel">
        <int:service-activator id="serviceActivator"
                       ref="ouboundService"  method="createRequest"/>
    </int:chain>

    <int-amqp:outbound-gateway id="outboundGtwyId" header-mapper="headerMapper"
                        request-channel="enrichedInboundRequestChannel"
                        reply-channel="defaultReplyChannel"
                        amqp-template="template" 
                        reply-timeout="30000" 
                        exchange-name="request_exchange" 
                        routing-key="request_exchange_queue"/>

    <int-amqp:inbound-channel-adapter id="amqpMessageDriven"  queue-names="request_queue" 
                                 connection-factory="rabbitConnectionFactory"  channel="processAuthRequestChannel"/>

    <int:service-activator id="serviceActivator"
                       ref="ouboundService" input-channel="processAuthRequestChannel" output-channel="postProcessorChannel"
                       method="processRequest"/>

    <int-amqp:outbound-channel-adapter amqp-template="template" channel="postProcessorChannel" 
            header-mapper="headerMapper" exchange-name="reply_exchange" routing-key="reply_exchange_queue"/>

    <bean id="headerMapper" class="org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper"/>

Config

@Bean
public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){
    final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
    template.setQueue("reply_queue");
    return template;
}



@Bean
public Binding binding(){
    return BindingBuilder.bind(this.queue()).to(this.exchange()).with("request_exchange_queue");
}

@Bean
public DirectExchange exchange(){
    return new DirectExchange("request_exchange");
}

@Bean
public Queue queue(){
    return new Queue("request_queue", true, false, true);
}

@Bean
public Binding bindingReply(){
    return BindingBuilder.bind(this.queue()).to(this.exchange()).with("reply_exchange_queue");
}

@Bean
public DirectExchange exchangeReply(){
    return new DirectExchange("reply_exchange");
}


@Bean
public Queue replyQueue(){
    return new Queue("reply_queue", true, false, true);
}

Service

@Service
public final class OuboundService {


    public Message createRequest(String message){
        System.out.println("Inside createRequest : "+ message);
        final String transactionId = UUID.randomUUID().toString();
        final Message builtMessage = MessageBuilder.withBody(message.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                .setHeader(AmqpHeaders.CORRELATION_ID, transactionId)
                .build();
        return builtMessage;
    }


    public Message processRequest(Message message){
        System.out.println("Inside process Request : "+ new String(message.getBody()));
        System.out.println("Header values : "+message.getMessageProperties().getHeaders());
        final Message result = MessageBuilder.withBody("Successful".getBytes()).copyProperties(message.getMessageProperties())
                                .copyHeaders(message.getMessageProperties().getHeaders()).build();
        return result;
    }

}

Error:

org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'outboundGtwyId', and its 'requiresReply' property is set to true.

GitHub source code (Resolved Solution)

https://github.com/kingkongprab/spring-amqp-outbound-gateway

1

1 Answers

0
votes

The correlation is done in the Spring AMQP as well. See its RabbitTemplate#sendAndRecevie() for more info. Also there is a good documentation on the matter in the Reference Manual.

Spring Integration with its AbstractAmqpOutboundEndpoint and AmqpInboundGateway implementations provides out-of-the-box request-reply correlation solution. If you are not able to use AmqpInboundGateway on the server side, you should ensure the correlationId transfer from received request to the reply to send back. Yes, you can use dedicated exchange for replies and that is what supported by the RabbitTemplate#setQueue() to wait for replies on the client, outbound side. But that still isn't going to work without proper correlation transferring. Also see https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-message-headers for the info how headers (including correlationId) are mapped in Spring Integration.

UPDATE

Thank you for sharing your application.

Well, now I see several problems:

  1. You are definitely missing the replyQueue binding:

    @Bean
    public Binding bindingReply(){
        return BindingBuilder.bind(this.replyQueue()).to(this.exchangeReply()).with("reply_exchange_queue");
    }
    
  2. RabbitTemplate must use setReplyAddress(). You have to configure MessageListenerContainer for the reply_queue and have RabbitTemplate as a listener:

    @Bean
    public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){
        final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
        template.setReplyAddress(replyQueue().getName());
        return template;
    }
    
    @Bean
    public MessageListenerContainer replyContainer(RabbitTemplate template) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(template.getConnectionFactory());
        container.setQueues(replyQueue());
        container.setMessageListener(template);
        return container;
    }
    
  3. Your OuboundService with org.springframework.amqp.core.Message manipulation is useless. The Channel Adapters don't know about this type of payload and your custom Message just becomes as a serialized body of another org.springframework.amqp.core.Message. I have changed it to this and everything works well:

    public String createRequest(String message){
        System.out.println("Inside createRequest : "+ message);
        return message;
    }
    
    
    public Message processRequest(Message message){
        System.out.println("Inside process Request : " + message);
        return message;
    }
    

Anyway I suggest you to rethink your design and come back to the AmqpInboundGateway.

BTW in the final solution you don't need to care about any correlation. The Framework does that for you automatically.