Issue: Stream input only works for 1 input sending to the aggregator for the output-channel 'out'. Subsequent messages only go to the discard-channel 'logLateArrivers'. What condition is being used to send to the discard-channel?
Description: Trying to port the Spring integration example for basic jms using a aggegator using WebSphere.
UPDATE: - Turning on debug shows that the poller is working. Messages are pulled and put to the MQ and the response is picked up. However for the MQ scenario after the first set of messages, the AggregatingMessageHandler is not used. Messages are sent to the 'logLateArrivers' adapter on the discard channel vs channel 'out' for output. I'm re-wording the issue statement to be more specific.
Spring Integration Example: Spring Integration Github Example
Output using Spring Integration Example:
test1
test2
[TEST1, TEST1]
[TEST2, TEST2]
Output using Spring Integration with Websphere
test1
test2
[TEST1, TEST2]
[TEST1, TEST2]
Ported Changes using Websphere MQ
common.xml
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory"> <bean class="com.ibm.mq.jms.MQConnectionFactory"> <property name="channel" value="channelName" /> <property name="hostName" value="host1234" /> <property name="port" value="1111" /> <property name="queueManager" value="testqmgr" /> <property name="transportType" value="1" /> </bean> </property> <property name="sessionCacheSize" value="10"/> </bean> <bean id="requestQueue" class="com.ibm.mq.jms.MQQueue"> <constructor-arg value="requestQueue"/> </bean> <bean id="requestTopic" class="com.ibm.mq.jms.MQTopic"> <constructor-arg value="topic.demo"/> </bean> <bean id="replyQueue" class="com.ibm.mq.jms.MQQueue"> <constructor-arg value="replyQueue"/> </bean> <!-- Poller that is the stream in channel for console input --> <integration:poller id="poller" default="true" fixed-delay="1000"/>
Aggregation.xml
<int-stream:stdin-channel-adapter id="stdin" channel="stdinToJmsoutChannel"/> <int:channel id="stdinToJmsoutChannel"/> <int:chain input-channel="stdinToJmsoutChannel"> <int:header-enricher> <int:header name="jms_replyTo" ref="replyQueue" /> </int:header-enricher> <int-jms:outbound-channel-adapter destination="requestTopic" /> </int:chain> <int-jms:message-driven-channel-adapter channel="jmsReplyChannel" destination="replyQueue"/> <int:channel id="jmsReplyChannel" /> <int:aggregator input-channel="jmsReplyChannel" output-channel="out" group-timeout="5000" expire-groups-upon-timeout="false" send-partial-result-on-expiry="true" discard-channel="logLateArrivers" correlation-strategy-expression="headers['jms_correlationId']" release-strategy-expression="size() == 2"/> <int:logging-channel-adapter id="logLateArrivers" /> <!-- Subscribers --> <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" /> <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" /> <int:transformer input-channel="upcase" expression="payload.toUpperCase()" /> <!-- Profiles --> <beans profile="default"> <int-stream:stdout-channel-adapter id="out" append-newline="true"/> </beans> <beans profile="testCase"> <int:bridge input-channel="out" output-channel="queueChannel"/> <int:channel id="queueChannel"> <int:queue /> </int:channel> </beans>