We have a Spring Integration application and are getting out of memory errors during high throughput (a production batch process). The application is utilizing Spring-integration-core release 4.3.10. We are using org.springframework.integration.annotation* in our application and on our Aggregator object (@MessageEndpoint/@Autowired/@Aggregator) and have a custom a 'aggregate' method.
In heap dumps, I am seeing SimpleMessageGroup (and other associated objects) from high throughput runs and also when individual SoapUI requests are run. Please see the attached screen shots.
I have read there are a number of variables that can be set in the 'AggregatingMessageHandler' object. In order to gain access to these variables, while still utilizing 'spring.integration.annotation' and with a custom 'aggregate' method', I changed my annotated Aggregator object extend 'AggregatingMessageHandler'.
@MessageEndpoint
public class MyAggregator extends AggregatingMessageHandler {
private final MyLogger myTransactionLogger;
@Autowired
public CIPAggregator( MyLogger myTransactionLogger) {
super(new DefaultAggregatingMessageGroupProcessor(), new SimpleMessageStore());
this.setSendPartialResultOnExpiry(false);
this.setExpireGroupsUponCompletion(true);
//this.setMinimumTimeoutForEmptyGroups(500);
this.setGroupTimeoutExpression(new ValueExpression<>(1000L));
this.setExpireGroupsUponTimeout(true);
this.cipTransactionLogger = cipTransactionLogger;
}
@Aggregator(inputChannel = "serviceResponseChannel", outputChannel="aggregatorResponseChannel")
public <T> IndividualInquiryResponse aggregate(List<Message> serviceResponses) {...custom code }
I have tried different settings and combinations of the above variables, but I continue to see the same behavior.
Any assistance would be greatly appreciated.
Update June 6, 2018 Per instruction from Artem Bilan Successfully configured Aggregator by utilizing @Bean\@ServiceActivator annotation with AggregatorFactoryBean. (MyAggregator object does not contain any annotations)
@Bean
@ServiceActivator(inputChannel = "serviceResponseChannel")
FactoryBean<MessageHandler> aggregatorFactoryBean( ) {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(new MyAggregator(myTransactionLogger()));
aggregatorFactoryBean.setMethodName("aggregate");
aggregatorFactoryBean.setMessageStore(new SimpleMessageStore());
aggregatorFactoryBean.setOutputChannel(aggregatorResponseChannel());
aggregatorFactoryBean.setExpireGroupsUponTimeout(true);
aggregatorFactoryBean.setGroupTimeoutExpression(new ValueExpression<>(1900L));// @MessagingGateway defaultReplyTimeout=2000
aggregatorFactoryBean.setSendPartialResultOnExpiry(false);
aggregatorFactoryBean.setExpireGroupsUponCompletion(true);
return aggregatorFactoryBean;
}
Receiving responses as expected, but still have memory leak issues with SimpleMessageGroup and associated objects. One SimpleMessageGroup per request.