1
votes

I have a simple camel route that takes a list of items, splits them sending each element to a mq node for processing then joins them back together via an aggregator.

Very close to the Composed Message Processor: http://camel.apache.org/composed-message-processor.html

But we noticed that after the split, camel will create multiple concurrent consumers? or exchanges? Since the message is being sent to multiple consumers they never complete.

List: 1,2,3,4

Split: amq::process_each_item

Aggregate:

[Camel (camel-3) thread #41 - Aggregating 1 - Waiting on 3 more items
[Camel (camel-1) thread #16 - Aggregating 2 - Waiting on 3 more items
[Camel (camel-3) thread #49 - Aggregating 3 - Waiting on 2 more items
[Camel (camel-1) thread #15 - Aggregating 4 - Waiting on 2 more items

So, camel spawned 2 aggregators, and each is waiting on 4 items, but they only ever get two each.

Camel Route:

<camelContext xmlns="http://camel.apache.org/schema/spring">

        <route> <!-- This route splits the reg request into it's items. Adding needed info to the message header.  -->
            <from uri="activemq:registration.splitByItemQueue" />  <!-- pick up the reg req -->
            <setHeader headerName="regReqId"> <!-- Need to store the Reg Req in the header  -->
                <simple>${body.registrationRequest.id}</simple>
            </setHeader>

            <split parallelProcessing="false" strategyRef="groupedExchangeAggregator"> <!-- Split the RegRequestInfo into it's individual requestItems (add, drop, etc) -->
                <method ref="requestSplitter"  method="split" />   <!-- does the actual splitting -->
                <setHeader headerName="JMSXGroupID"> <!-- This is CRITICAL. It is how we ensure valid seat check counts without db locking -->
                    <simple>FOID=${body.formatOfferingId}</simple>  <!-- grouping on the foid -->
                </setHeader>
                <to uri="activemq:registration.lprActionQueue"/> <!-- send to queue's for processing-->
            </split>
        </route>

        <route>    <!-- performs the registration + seat check -->
            <from uri="activemq:registration.lprActionQueue" />

            <bean ref="actionProcessor" method="process"/> <!-- go to the java code that makes all the decisions -->
            <to uri="activemq:registration.regReqItemJoinQueue"/> <!-- send to join queue's for final processing-->
        </route>

        <route>    <!-- This route joins items from the reg req item split. Once all items have completed, update state-->
            <from uri="activemq:registration.regReqItemJoinQueue" />  <!-- Every Reg Req Item will come here-->
            <aggregate strategyRef="groupedExchangeAggregator" ignoreInvalidCorrelationKeys="false" completionFromBatchConsumer="true"> <!-- take all the Reg Req Items an join them to their req -->
                <correlationExpression>
                    <header>regReqId</header> <!-- correlate on the regReqId we stored in the header -->
                </correlationExpression>

                <bean ref="actionProcessor" method="updateRegistrationRequestStatus"/> <!-- update status -->                   
            </aggregate>
        </route>
</camelContext>

<bean id="groupedExchangeAggregator" class="org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy" />

On my local machine the above works fine, but when we deploy to our test server half of the messages go to one camel aggregator, half to the other. causing none to ever finish. Notice in the config below that we've set concurrent consumers to 1 for camel.

Here's the camel / activemq config

<amq:broker useJmx="false" persistent="false">
        <amq:plugins>
            <amq:statisticsBrokerPlugin />
        </amq:plugins>
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://localhost:0" />
        </amq:transportConnectors>
    </amq:broker>

    <!-- Basic AMQ connection factory -->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="vm://localhost" />

    <!-- Wraps the AMQ connection factory in Spring's caching (ie: pooled) factory
         From the AMQ "Spring Support"-page: "You can use the PooledConnectionFactory for efficient pooling... or you
         can use the Spring JMS CachingConnectionFactory to achieve the same effect."
         See "Consuming JMS from inside Spring" at http://activemq.apache.org/spring-support.html
         Also see http://codedependents.com/2010/07/14/connectionfactories-and-caching-with-spring-and-activemq/

         Note: there are pros/cons to using Spring's caching factory vs Apache's PooledConnectionFactory; but, until
         we have more explicit reasons to favor one over the other, Spring's is less tightly-coupled to a specific
         AMQP-implementation.
         See http://stackoverflow.com/a/19594974
    -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="1"/>
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
    </bean>

    <bean id="jmsConfig"
          class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="concurrentConsumers" value="1"/>
    </bean>

    <bean id="activemq"
          class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>
1

1 Answers

0
votes

Turns out we had another spring context / servlet importing our config. We believe this was the issue.