I am trying to write a simple message flow using Spring Integration v4's DSL APIs which would look like this:
-> in.ch -> Processing -> JmsGatewayOut -> JMS_OUT_QUEUE
Gateway
<- out.ch <- Processing <- JmsGatewayIn <- JMS_IN_QUEUE
With the request/response being asynchronous, when I inject a message via the initial Gateway, the message goes all the way to JMS_OUT_QUEUE. Beyond this message flow, a reply message is put back into JMS_IN_QUEUE which it is then picked up by JmsGatewayIn. At this point, the message is Processed and placed into out.ch (I know the response gets to out.ch because I have a logger interceptor there which logs the message being placed there) but, the Gateway never receives the response.
Instead of a response, the system outside of this message flow which picked up the message from JMS_OUT_QUEUE and placed the response in JMS_IN_QUEUE, receives a javax.jms.MessageFormatException: MQJMS1061: Unable to deserialize object
on its own JmsOutboundgateway (I think it is failing to deserialize a jms reply object from looking at the logs).
I have clearly not got something configured correctly but I don't know exactly what. Does anyone know what I am missing?
Working with spring-integration-core-4.0.3.RELEASE, spring-integration-jms-4.0.3.RELEASE, spring-integration-java-dsl-1.0.0.M2, spring-jms-4.0.6.RELEASE.
My Gateway is configured as follows:
@MessagingGateway
public interface WsGateway {
@Gateway(requestChannel = "in.ch", replyChannel = "out.ch",
replyTimeout = 45000)
AResponse process(ARequest request);
}
My Integration flow is configured as follows:
@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class IntegrationConfig {
@Bean(name = "in.ch")
public DirectChannel inCh() {
return new DirectChannel();
}
@Bean(name = "out.ch")
public DirectChannel outCh() {
return new DirectChannel();
}
@Autowired
private MQQueueConnectionFactory mqConnectionFactory;
@Bean
public IntegrationFlow requestFlow() {
return IntegrationFlows.from("in.ch")
.handle("processor", "processARequest")
.handle(Jms.outboundGateway(mqConnectionFactory)
.requestDestination("JMS_OUT_QUEUE")
.correlationKey("JMSCorrelationID")
.get();
}
@Bean
public IntegrationFlow responseFlow() {
return IntegrationFlows.from(Jms.inboundGateway(mqConnectionFactory)
.destination("JMS_IN_QUEUE"))
.handle("processor", "processAResponse")
.channel("out.ch")
.get();
}
}
Thanks for any help on this, PM.