1
votes

Version : spring-integration-core - 2.2.3

Here is the simplified version of my splitter/aggregator setup.

<task:executor id="taskExecutor" pool-size="${pool.size}"
               queue-capacity="${queue.capacity}" 
       rejection-policy="CALLER_RUNS" keep-alive="120"/>

<int:channel id="service-requests"/>
<int:channel id="service-request"/>
<int:channel id="channel-1">
    <int:dispatcher task-executor="taskExecutor" failover="false"/>
</int:channel>
<int:channel id="channel-2">
    <int:dispatcher task-executor="taskExecutor" failover="false"/>
</int:channel>


<int:gateway id="myServiceRequestor" default-reply-timeout="${reply.timeout}"
             default-reply-channel="service-aggregated-reply"
             default-request-channel="service-request"
             service-interface="com.blah.blah.MyServiceRequestor"/>

<int:splitter input-channel="service-request"
              ref="serviceSplitter" output-channel="service-requests"/>

<!-- To split the request and return a java.util.Collection of Type1 and Type2 -->
<bean id="serviceSplitter" class="com.blah.blah.ServiceSplitter"/>


<int:payload-type-router input-channel="service-requests" resolution-required="true">
    <int:mapping
            type="com.blah.blah.Type1"
            channel="channel-1"/>
    <int:mapping
            type="com.blah.blah.Type2"
            channel="channel-2"/>
</int:payload-type-router>

<!-- myService is a bean where processType1 & processType2 method is there to process the payload -->
<int:service-activator input-channel="channel-1"
                       method="processType1" output-channel="service-reply" requires-reply="true"
                       ref="myService"/>

<int:service-activator input-channel="channel-2"
                       method="processType2" output-channel="service-reply" requires-reply="true"
                       ref="myService"/>

<int:publish-subscribe-channel id="service-reply" task-executor="taskExecutor"/>

<!-- myServiceAggregator has a aggregate method which takes a Collection as argument(aggregated response from myService) -->
<int:aggregator input-channel="service-reply"
                method="aggregate" ref="myServiceAggregator"
                output-channel="service-aggregated-reply"
                send-partial-result-on-expiry="false"
                message-store="myResultMessageStore"
                expire-groups-upon-completion="true"/>

<bean id="myResultMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />

<bean id="myResultMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="myResultMessageStore" />
    <property name="timeout" value="2000" />
</bean>

<task:scheduled-tasks>
    <task:scheduled ref="myResultMessageStoreReaper" method="run" fixed-rate="10000" />
</task:scheduled-tasks>

If the processType1/processType2 method in mySevice throws a RuntimeException, then it tries to send the message to an error channel(i believe spring does it by default) and the message payload in error channel stays on in heap and not getting garbage collected.

Updated More Info: For my comment on error channel. I debugged the code and found that ErrorHandlingTaskExecutor is trying to use a MessagePublishingErrorHandler which inturn sending the message to the channel returned by MessagePublishingErrorHandler.resolveErrorChannel method.

Code snippet from ErrorHandlingTaskExecutor.java

public void execute(final Runnable task) {
    this.executor.execute(new Runnable() {
        public void run() {
            try {
                task.run();
            }
            catch (Throwable t) {
                errorHandler.handleError(t);   /// This is the part which sends the message in to error channel.
            }
        }
    });
}

Code snipper from MessagePublishingErrorHandler.java

public final void handleError(Throwable t) {
    MessageChannel errorChannel = this.resolveErrorChannel(t);
    boolean sent = false;
    if (errorChannel != null) {
        try {
            if (this.sendTimeout >= 0) {
                sent = errorChannel.send(new ErrorMessage(t), this.sendTimeout);
.....

When i take a heap dump, I always see the reference to the payload message(which i believe is maintained in the above channel) and not getting GC'ed.

Would like to know what is the correct way to handle this case or if i'm missing any in my config? Also is it possible to tell spring to discard the payload(instead of sending it to error channel) in case of any exception thrown by the service activator method?

Looking forward for your inputs.

Thanks.

1

1 Answers

1
votes

You don't have an error-channel defined on your gateway so we won't send it there, we'll just throw an exception to the caller.

However, the partial group is sitting in the aggregator and will never complete. You need to configure a MessageGroupStoreReaper as shown in the reference manual (or set a group-timeout in Spring Integration 4.0.x) to discard the partial group.