0
votes

I am trying to do a GroupBy a list of GeoJSON Features based on a shared ID, in order to aggregate a single field of these Features, by using split/aggregate, like so:

@Bean
IntegrationFlow myFlow() {
    return IntegrationFlows.from(MY_DIRECT_CHANNEL)
            .handle(Http.outboundGateway(myRestUrl)
                    .httpMethod(HttpMethod.GET)
                    .expectedResponseType(FeatureCollection.class)
                    .mappedResponseHeaders(""))
            .split(FeatureCollection.class, FeatureCollection::getFeatures)
            .aggregate(aggregator -> aggregator
                    .outputProcessor(agg -> {
                        final List<String> collected = agg
                                .getMessages()
                                .stream()
                                .map(m -> ((Number)((Feature) m.getPayload()).getProperties().get("my_field")).intValue() + "")
                                .collect(Collectors.toList());
                        return MyPojo.builder()
                                .myId(((Number) agg.getGroupId()).longValue())
                                .myListString(String.join(",", collected))
                                .build();
                    })
                    .correlationStrategy(m -> ((Feature) m.getPayload()).getProperties().get("shared_id"))
                    // .sendPartialResultOnExpiry(true)
                    // .groupTimeout(10000) // there's got to be a better way ...
                    // .expireGroupsUponTimeout(false)
            )
            .handle(Jpa.updatingGateway(myEntityManagerFactory).namedQuery(MyPojo.QUERY_UPDATE),
                    spec -> spec.transactional(myTransactionManager))
            .nullChannel();
}

Unless I un-comment those 3 lines, the aggregator never releases the groups and the database never receives any updates. If I set groupTimeout to less than 5 seconds, I am missing partial results.

I expected the releaseStrategy to be SimpleSequenceSizeReleaseStrategy by default which I expected would automatically release all the groups after all of the (split) Features had been processed (there are only 129 Features in total from the REST service message). Manually setting this as the releaseStrategy doesn't help.

What is the proper way to release the groups once all 129 messages have been processed ?

1
Show your splitter code; what does it return? If it is a subclass of AbstractMessageSplitter and returns an Iterable that's not a Collection, or Iterator, you must override the appropriate obtainSizeIfPossible() method - otherwise it can't determine the sequence size. - Gary Russell
The splitter function FeatureCollection::getFeatures looks like this: public List<Feature> getFeatures() {return features;}. Can I find an example somewhere of how to use AbstractMessageSplitter with Java DSL ? - Chris
I was trying to follow the smilar example in samples/dsl/cafe/lambda/Application.java that looks like this: .split(Order.class, Order::getItems) - Chris

1 Answers

-1
votes

I got it to work using a transformer instead of split/aggregate:

@Bean
IntegrationFlow myFlow(MyTransformer myTransformer) {
    return IntegrationFlows.from(MY_DIRECT_CHANNEL)
            .handle(Http.outboundGateway(myRestUrl)
                    .httpMethod(HttpMethod.GET)
                    .expectedResponseType(FeatureCollection.class)
                    .mappedResponseHeaders(""))
            .transform(myTransformer)
            .split()
            .handle(Jpa.updatingGateway(myEntityManagerFactory).namedQuery(MyEntity.QUERY_UPDATE),
                    spec -> spec.transactional(myTransactionManager))
            .nullChannel();
}

And the signature of the transformer is:

@Component
public class MyTransformer implements GenericTransformer<FeatureCollection, List<MyEntity>> {

    @Override
    public List<MyEntity> transform(FeatureCollection featureCollection) {
        ...
    }
}