1
votes

I want to collect some messages(lets say 10) and pass them as a list to the service activator instead of passing them one by one.

The context:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns=...>

    <int:channel id="ch.http.in"/>
    <int:channel id="ch.http.trans"/>
    <int:channel id="ch.http.aggr"/>
    <int-http:inbound-channel-adapter path="test" channel="ch.http.in"/>

    <int:map-to-object-transformer input-channel="ch.http.in" output-channel="ch.http.trans" type="demo.Req"/>
    <int:aggregator 
        input-channel="ch.http.trans" 
        output-channel="ch.http.aggr"
        release-strategy-expression="size() == 10"
        correlation-strategy-expression="headers['id']"
        ref="aggr" method="add"/>
    <int:service-activator ref="srv" method="httpTest" input-channel="ch.http.aggr"/>

    <bean id="srv" class="demo.IntService"/>
    <bean id="aggr" class="demo.HttpAggregator"/>
</beans>

The aggreagator:

public class HttpAggregator{
    public List<Req> add(List<Req> reqs) {
        System.out.println(reqs);
        return reqs;
      }
}

The service:

public class IntService {
    public void httpTest(Req msg){
        System.out.println(msg);
    }
}

Req is just a POJO.

The problem is that the aggregator method is never called. Without the aggregtor the messages are passed to the service activator with no problem. Using Spring Integration 3.0.2.RELEASE (Spring Boot 1.0.2.RELEASE)

Edit: When I changed correlation-strategy-expression="headers['id']" to correlation-strategy-expression="payload.id"(the Req object has property id) it works when I pass different ids for every chunk(e.g. id=1 for the first 10; 2 for the next 10...) Looks that that's how the correlation strategy works. How can I baypass it? I just want to limit the size of the aggregated list.

1

1 Answers

4
votes

Right; you have to correlate on something; using the headers['id'] will end up with lots of group of 1 item which will never meet the release strategy.

For a simple use case like yours, correlate on a literal - e.g. correlation-expression="'foo'" and set expire-groups-on-completion="true". This resets the group after the release, so a new one (with the same correlation id) can start on the next message.

If you want to release a partial group after some timeout, you will need a MessageGroupStoreReaper. Or, if you can upgrade to 4.0.x, the aggregator now has a group-timeout (or group-timeout-expression).