I am trying to do a GroupBy a list of GeoJSON Features based on a shared ID, in order to aggregate a single field of these Features, by using split/aggregate, like so:
@Bean
IntegrationFlow myFlow() {
return IntegrationFlows.from(MY_DIRECT_CHANNEL)
.handle(Http.outboundGateway(myRestUrl)
.httpMethod(HttpMethod.GET)
.expectedResponseType(FeatureCollection.class)
.mappedResponseHeaders(""))
.split(FeatureCollection.class, FeatureCollection::getFeatures)
.aggregate(aggregator -> aggregator
.outputProcessor(agg -> {
final List<String> collected = agg
.getMessages()
.stream()
.map(m -> ((Number)((Feature) m.getPayload()).getProperties().get("my_field")).intValue() + "")
.collect(Collectors.toList());
return MyPojo.builder()
.myId(((Number) agg.getGroupId()).longValue())
.myListString(String.join(",", collected))
.build();
})
.correlationStrategy(m -> ((Feature) m.getPayload()).getProperties().get("shared_id"))
// .sendPartialResultOnExpiry(true)
// .groupTimeout(10000) // there's got to be a better way ...
// .expireGroupsUponTimeout(false)
)
.handle(Jpa.updatingGateway(myEntityManagerFactory).namedQuery(MyPojo.QUERY_UPDATE),
spec -> spec.transactional(myTransactionManager))
.nullChannel();
}
Unless I un-comment those 3 lines, the aggregator never releases the groups and the database never receives any updates. If I set groupTimeout to less than 5 seconds, I am missing partial results.
I expected the releaseStrategy to be SimpleSequenceSizeReleaseStrategy by default which I expected would automatically release all the groups after all of the (split) Features had been processed (there are only 129 Features in total from the REST service message). Manually setting this as the releaseStrategy doesn't help.
What is the proper way to release the groups once all 129 messages have been processed ?
AbstractMessageSplitterand returns anIterablethat's not aCollection, orIterator, you must override the appropriateobtainSizeIfPossible()method - otherwise it can't determine the sequence size. - Gary RussellFeatureCollection::getFeatureslooks like this:public List<Feature> getFeatures() {return features;}. Can I find an example somewhere of how to useAbstractMessageSplitterwith Java DSL ? - Chrissamples/dsl/cafe/lambda/Application.javathat looks like this:.split(Order.class, Order::getItems)- Chris