2
votes

Issue: Stream input only works for 1 input sending to the aggregator for the output-channel 'out'. Subsequent messages only go to the discard-channel 'logLateArrivers'. What condition is being used to send to the discard-channel?

Description: Trying to port the Spring integration example for basic jms using a aggegator using WebSphere.

UPDATE: - Turning on debug shows that the poller is working. Messages are pulled and put to the MQ and the response is picked up. However for the MQ scenario after the first set of messages, the AggregatingMessageHandler is not used. Messages are sent to the 'logLateArrivers' adapter on the discard channel vs channel 'out' for output. I'm re-wording the issue statement to be more specific.

Spring Integration Example: Spring Integration Github Example

Output using Spring Integration Example:

test1
test2
[TEST1, TEST1]
[TEST2, TEST2]

Output using Spring Integration with Websphere

test1
test2
[TEST1, TEST2]
[TEST1, TEST2]

Ported Changes using Websphere MQ

  1. common.xml

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory">
            <bean class="com.ibm.mq.jms.MQConnectionFactory">
                <property name="channel" value="channelName" />
                <property name="hostName" value="host1234" />
                <property name="port" value="1111" />
                <property name="queueManager" value="testqmgr" />
                <property name="transportType" value="1" /> 
            </bean>
        </property>
        <property name="sessionCacheSize" value="10"/>
    </bean>
    
    <bean id="requestQueue" class="com.ibm.mq.jms.MQQueue">
        <constructor-arg value="requestQueue"/>
    </bean>
    
    <bean id="requestTopic" class="com.ibm.mq.jms.MQTopic">
        <constructor-arg value="topic.demo"/>
    </bean>
    
    <bean id="replyQueue" class="com.ibm.mq.jms.MQQueue">
        <constructor-arg value="replyQueue"/>
    </bean>
    
    <!-- Poller that is the stream in channel for console input -->
    <integration:poller id="poller" default="true" fixed-delay="1000"/>
    

  2. Aggregation.xml

    <int-stream:stdin-channel-adapter id="stdin" channel="stdinToJmsoutChannel"/>
    
    <int:channel id="stdinToJmsoutChannel"/>
    
    <int:chain input-channel="stdinToJmsoutChannel">
        <int:header-enricher>
            <int:header name="jms_replyTo" ref="replyQueue" />
        </int:header-enricher>
        <int-jms:outbound-channel-adapter destination="requestTopic" />
    </int:chain>
    
    <int-jms:message-driven-channel-adapter channel="jmsReplyChannel"
        destination="replyQueue"/>
    
    <int:channel id="jmsReplyChannel" />
    
    <int:aggregator input-channel="jmsReplyChannel" output-channel="out"
        group-timeout="5000"
        expire-groups-upon-timeout="false"
        send-partial-result-on-expiry="true"
        discard-channel="logLateArrivers"
        correlation-strategy-expression="headers['jms_correlationId']"
        release-strategy-expression="size() == 2"/>
    
    <int:logging-channel-adapter id="logLateArrivers" />
    
    <!-- Subscribers -->
    
    <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" />
    
    <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" />
    
    <int:transformer input-channel="upcase" expression="payload.toUpperCase()" />
    
    <!-- Profiles -->
    
    <beans profile="default">
    
        <int-stream:stdout-channel-adapter id="out" append-newline="true"/>
    
    </beans>
    
    <beans profile="testCase">
    
        <int:bridge input-channel="out" output-channel="queueChannel"/>
    
        <int:channel id="queueChannel">
            <int:queue />
        </int:channel>
    
    </beans>
    

1

1 Answers

1
votes

The messages should be correlated on jms_correlationId. Turn on DEBUG logging and compare the message flow between the sample and your version. Perhaps the correlation id is not being set up correctly.

The inbound gateways use this logic...

replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());

So the messages associated with each request should get the same jms_correlationId when sent to the aggregator.

Your test indicates that both messages, somehow, have the same message id.

EDIT

Messages arriving with the same correlation id (in this case headers['jms_correlationId']) will be discarded (late arrivers), unless expire-groups-upon-completion="true" - which allows a new group to start instead of discarding. You need to figure out why the second group has the same correlation id as the first.