1
votes

I am trying to write a simple message flow using Spring Integration v4's DSL APIs which would look like this:

       -> in.ch -> Processing -> JmsGatewayOut -> JMS_OUT_QUEUE
Gateway
       <- out.ch <- Processing <- JmsGatewayIn <- JMS_IN_QUEUE

With the request/response being asynchronous, when I inject a message via the initial Gateway, the message goes all the way to JMS_OUT_QUEUE. Beyond this message flow, a reply message is put back into JMS_IN_QUEUE which it is then picked up by JmsGatewayIn. At this point, the message is Processed and placed into out.ch (I know the response gets to out.ch because I have a logger interceptor there which logs the message being placed there) but, the Gateway never receives the response.

Instead of a response, the system outside of this message flow which picked up the message from JMS_OUT_QUEUE and placed the response in JMS_IN_QUEUE, receives a javax.jms.MessageFormatException: MQJMS1061: Unable to deserialize object on its own JmsOutboundgateway (I think it is failing to deserialize a jms reply object from looking at the logs).

I have clearly not got something configured correctly but I don't know exactly what. Does anyone know what I am missing?

Working with spring-integration-core-4.0.3.RELEASE, spring-integration-jms-4.0.3.RELEASE, spring-integration-java-dsl-1.0.0.M2, spring-jms-4.0.6.RELEASE.

My Gateway is configured as follows:

@MessagingGateway
public interface WsGateway {

    @Gateway(requestChannel = "in.ch", replyChannel = "out.ch", 
        replyTimeout = 45000)
    AResponse process(ARequest request);
}

My Integration flow is configured as follows:

@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class IntegrationConfig {

    @Bean(name = "in.ch")
    public DirectChannel inCh() {
        return new DirectChannel();
    }

    @Bean(name = "out.ch")
    public DirectChannel outCh() {
        return new DirectChannel();
    }   

    @Autowired
    private MQQueueConnectionFactory mqConnectionFactory;

    @Bean
    public IntegrationFlow requestFlow() {

        return IntegrationFlows.from("in.ch")
                .handle("processor", "processARequest")
                .handle(Jms.outboundGateway(mqConnectionFactory)
                        .requestDestination("JMS_OUT_QUEUE")
                        .correlationKey("JMSCorrelationID")
                .get();
    }

    @Bean
    public IntegrationFlow responseFlow() {

        return IntegrationFlows.from(Jms.inboundGateway(mqConnectionFactory)
                .destination("JMS_IN_QUEUE"))
                .handle("processor", "processAResponse")
                .channel("out.ch")
                .get();
    }
}

Thanks for any help on this, PM.

1

1 Answers

2
votes

First of all your configuration is bad:

  1. Since you start the flow from WsGateway#process you really should wait reply there. The gateway's request/reply capability is based on TemporaryReplyChannel, which is placed to the headers as non-serializable value.

  2. As long as you wait rely on that gateway, actually there is no reason to provide the replyChannel, if you aren't going to do some publish-subscribe logic on the reply.

  3. As you send message to the JMS queue, you should understand that consumer part might be a separete remote application. And the last one might know nothing about your out.ch.

  4. The JMS request/reply capability is really based on JMSCorrelationID, but it isn't enough. The one more thing here is a ReplyTo JMS header. Hence, if you are going to send reply from the consumer you should really just rely on the JmsGatewayIn stuff.

So I'd change your code to this:

@MessagingGateway
public interface WsGateway {

    @Gateway(requestChannel = "in.ch", replyTimeout = 45000)
    AResponse process(ARequest request);
}

@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class IntegrationConfig {

    @Bean(name = "in.ch")
    public DirectChannel inCh() {
        return new DirectChannel();
    }

    @Autowired
    private MQQueueConnectionFactory mqConnectionFactory;

    @Bean
    public IntegrationFlow requestFlow() {
        return IntegrationFlows.from("in.ch")
                .handle("processor", "processARequest")
                .handle(Jms.outboundGateway(mqConnectionFactory)
                        .requestDestination("JMS_OUT_QUEUE")
                        .replyDestination("JMS_IN_QUEUE"))
                .handle("processor", "processAResponse")
                .get();
    }

}

Let me know, if it is appropriate for you or try to explian why you use two-way gateways for one one-way cases. Maybe Jms.outboundAdapter() and Jms.inboundAdapter() are more good for you?

UPDATE

How to use <header-channels-to-string> from Java DSL:

.enrichHeaders(e -> e.headerChannelsToString())