0
votes

I am trying to use spring integration to do a Gateway --> Splitter-->ServiceActivator --> Aggregator Pattern in an event driven fashion backed by JMS . I Expect the service activator to be multi-threaded and any of the end points can be executed on a cluster and not necessarily the originating server . I could get this working in a single JVM without using JMS ( Using SI Channels ) but I understand that SI Channels will not help me scale horizontally i.e multiple VMs .

Here's the configuration I have so far

    <int:gateway id="transactionGateway" default-reply-channel="transaction-reply"
    default-request-channel="transaction-request" default-reply-timeout="10000"
    service-interface="com.test.abc.integration.service.ProcessGateway">
</int:gateway>

<int-jms:outbound-gateway id="transactionJMSGateway"
    correlation-key="JMSCorrelationID" request-channel="transaction-request"
    request-destination="transactionInputQueue" reply-channel="transaction-reply"
    reply-destination="transactionOutputQueue" extract-reply-payload="true"
    extract-request-payload="true">
    <int-jms:reply-listener
        max-concurrent-consumers="20" receive-timeout="5000"
        max-messages-per-task="1" />
</int-jms:outbound-gateway>

<!-- Inbound Gateway for Splitter -->
<int-jms:inbound-gateway id="splitterGateWay"
    request-destination="transactionInputQueue" request-channel="splitter-input"
    reply-channel="splitter-output" concurrent-consumers="1"
    default-reply-destination="processInputQueue"
    max-concurrent-consumers="1" extract-reply-payload="true"
    correlation-key="JMSCorrelationID" extract-request-payload="true" />

<!-- Inbound Gateway Invokes Service Activator and Sends response back to 
    the channel -->
<int-jms:inbound-gateway id="seriveActivatorGateway"
    request-destination="processInputQueue" request-channel="process-input"
    reply-channel="process-output" concurrent-consumers="1"
    default-reply-destination="processOutputQueue"
    max-concurrent-consumers="1" extract-reply-payload="true"
    correlation-key="JMSCorrelationID" extract-request-payload="true"
    max-messages-per-task="1" />

<int-jms:inbound-gateway id="aggregatorGateway"
    request-destination="processOutputQueue" request-channel="aggregator-input"
    reply-channel="aggregator-output" concurrent-consumers="1"
    default-reply-destination="transactionOutputQueue"
    max-concurrent-consumers="1" extract-reply-payload="true"
    extract-request-payload="true" max-messages-per-task="1"
    correlation-key="JMSCorrelationID" />


<int:splitter id="transactionSplitter" input-channel="splitter-input"
    ref="processSplitter" output-channel="splitter-output">
</int:splitter>

<int:service-activator id="jbpmServiceActivator"
    input-channel="process-input" ref="jbpmService" requires-reply="true"
    output-channel="process-output">
</int:service-activator>

<int:aggregator id="transactionAggregator"
    input-channel="aggregator-input" method="aggregate" ref="processAggregator"
    output-channel="aggregator-output" message-store="processResultMessageStore"
    send-partial-result-on-expiry="false">
</int:aggregator>

Before using gateway I tried using JMS backed Channels and that approach was't successful either . The problem I am facing now is that the Splitter now reply back to the transactionOutputQueue . I tried playing around with jms:header-enricher without much success . I feel that my approach to the problem /SI might have fundamental flaw . Any help /guidance is highly appreciated .

Also , in the code snippet I have provided above use a simple in memory aggregator , I understand that If I need to get this working across the cluster I might need a JDBC backed Aggregator but for the for now , I am trying to get this pattern working on a single VM

Here's the updated working configuration based on Gary's Comment

<bean id="processOutputQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test.com.abc.process.output" />
</bean>

<bean id="transactionOutputQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test.com.abc.transaction.result" />
</bean>

<bean id="transactionInputQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test.com.abc.transaction.input" />
</bean>

<int:gateway id="transactionGateway"
    default-request-channel="transaction-request" default-reply-timeout="10000"
    default-reply-channel="aggregator-output"
    service-interface="com.test.abc.integration.service.ProcessGateway">
</int:gateway>

<int:splitter id="transactionSplitter" input-channel="transaction-request"
    ref="processSplitter" output-channel="splitter-output">
</int:splitter>


<int-jms:outbound-gateway id="splitterJMSGateway"
    correlation-key="JMSCorrelationID" request-channel="splitter-output"
    request-destination="processInputQueue" reply-channel="aggregator-input"
    reply-destination="processOutputQueue" extract-request-payload="true"
    extract-reply-payload="true">
    <int-jms:reply-listener
        max-concurrent-consumers="20" receive-timeout="5000" />
</int-jms:outbound-gateway>

<!-- Inbound Gateway Invokes Service Activator and Sends response back to 
    the channel -->
<int-jms:inbound-gateway id="seriveActivatorGateway"
    request-destination="processInputQueue" request-channel="process-input"
    reply-channel="process-output" default-reply-destination="processOutputQueue"
    concurrent-consumers="5" max-concurrent-consumers="10"
    extract-reply-payload="true" correlation-key="JMSCorrelationID"
    extract-request-payload="true" max-messages-per-task="1" />

<int:service-activator id="jbpmServiceActivator"
    input-channel="process-input" ref="jbpmService" requires-reply="true"
    output-channel="process-output">
</int:service-activator>


<int:aggregator id="transactionAggregator"
    input-channel="aggregator-input" ref="processAggregator"
    output-channel="aggregator-output" message-store="processResultMessageStore"
    send-partial-result-on-expiry="false">
</int:aggregator>

<bean id="processResultMessageStore"
    class="org.springframework.integration.store.SimpleMessageStore" />
<bean id="processResultMessageStoreReaper"
    class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="processResultMessageStore" />
    <property name="timeout" value="5000" />
</bean>
<task:scheduled-tasks>
    <task:scheduled ref="processResultMessageStoreReaper"
        method="run" fixed-rate="1000" />
</task:scheduled-tasks>

<int:logging-channel-adapter id="logger"
    level="DEBUG" log-full-message="true" />

<int-stream:stdout-channel-adapter
    id="stdoutAdapter" channel="logger" />

I limited the JMS pipeline only to the Service Activator , which is what I originally wanted .

The only question I have based on the above approach is that do I need to have my Aggregator backed by a database even if I use this across multiple VMS ( Since the JMS gateway in front of it make sure that it receives only the messages that have valid correlation ID ?)

Regards ,

1

1 Answers

2
votes

You probably don't need to use JMS between every component. However we have lots of test cases for chained gateways like this, and all works fine.

Something must be wired up incorrectly. Since you didn't show your full configuration, it's hard to speculate.

Be sure to use the latest version (2.2.4) and turn on DEBUG logging and follow a message through the flow; as long as your message payload is identifiable across JMS boundaries, it should be easy to figure out where things go awry.