1
votes

We have a Spring Integration application and are getting out of memory errors during high throughput (a production batch process). The application is utilizing Spring-integration-core release 4.3.10. We are using org.springframework.integration.annotation* in our application and on our Aggregator object (@MessageEndpoint/@Autowired/@Aggregator) and have a custom a 'aggregate' method.

In heap dumps, I am seeing SimpleMessageGroup (and other associated objects) from high throughput runs and also when individual SoapUI requests are run. Please see the attached screen shots.

I have read there are a number of variables that can be set in the 'AggregatingMessageHandler' object. In order to gain access to these variables, while still utilizing 'spring.integration.annotation' and with a custom 'aggregate' method', I changed my annotated Aggregator object extend 'AggregatingMessageHandler'.

@MessageEndpoint
public class MyAggregator extends AggregatingMessageHandler {

    private final MyLogger myTransactionLogger;

   @Autowired
   public CIPAggregator( MyLogger myTransactionLogger) {
       super(new DefaultAggregatingMessageGroupProcessor(), new SimpleMessageStore());
      this.setSendPartialResultOnExpiry(false);
      this.setExpireGroupsUponCompletion(true);
      //this.setMinimumTimeoutForEmptyGroups(500);
      this.setGroupTimeoutExpression(new ValueExpression<>(1000L));
      this.setExpireGroupsUponTimeout(true);
      this.cipTransactionLogger = cipTransactionLogger;

    }
  @Aggregator(inputChannel = "serviceResponseChannel", outputChannel="aggregatorResponseChannel")
    public <T> IndividualInquiryResponse aggregate(List<Message> serviceResponses) {...custom code }

I have tried different settings and combinations of the above variables, but I continue to see the same behavior.

SoapUI request details of SimpleMessageGroup Instance OneSoapUI request details of SimpleMessageGroup Instance TwoScreen shot of High Throughput run

Any assistance would be greatly appreciated.

Update June 6, 2018 Per instruction from Artem Bilan Successfully configured Aggregator by utilizing @Bean\@ServiceActivator annotation with AggregatorFactoryBean. (MyAggregator object does not contain any annotations)

@Bean
@ServiceActivator(inputChannel = "serviceResponseChannel")
FactoryBean<MessageHandler> aggregatorFactoryBean( ) {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(new MyAggregator(myTransactionLogger()));
    aggregatorFactoryBean.setMethodName("aggregate");
    aggregatorFactoryBean.setMessageStore(new SimpleMessageStore());
    aggregatorFactoryBean.setOutputChannel(aggregatorResponseChannel());
    aggregatorFactoryBean.setExpireGroupsUponTimeout(true);
    aggregatorFactoryBean.setGroupTimeoutExpression(new ValueExpression<>(1900L));// @MessagingGateway defaultReplyTimeout=2000
    aggregatorFactoryBean.setSendPartialResultOnExpiry(false);

    aggregatorFactoryBean.setExpireGroupsUponCompletion(true);

    return aggregatorFactoryBean;
}

Receiving responses as expected, but still have memory leak issues with SimpleMessageGroup and associated objects. One SimpleMessageGroup per request.

2

2 Answers

1
votes
If you are using a custom aggregate method within an Aggregator object, define an AggregatorFactoryBean similar to the following. Your Aggregator object should not contain an annotations

@Bean
@ServiceActivator(inputChannel = "serviceResponseChannel")
FactoryBean<MessageHandler> aggregatorFactoryBean( ) {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(new MyAggregator(myTransactionLogger()));//your Aggregator object
    aggregatorFactoryBean.setMethodName("aggregate");//your aggregator method
    aggregatorFactoryBean.setMessageStore(new SimpleMessageStore());
    aggregatorFactoryBean.setOutputChannel(aggregatorResponseChannel());
    aggregatorFactoryBean.setSendPartialResultOnExpiry(false);    
    aggregatorFactoryBean.setExpireGroupsUponCompletion(true);    
    aggregatorFactoryBean.setExpireGroupsUponTimeout(true);
    aggregatorFactoryBean.setGroupTimeoutExpression(new ValueExpression<>(1900L));// @MessagingGateway defaultReplyTimeout=2000
    return aggregatorFactoryBean;
}

public class MyAggregator {
    private final MyLogger myTransactionLogger;
    public MyAggregator( MyLogger myTransactionLogger) {
       this.myTransactionLogger = myTransactionLogger;
    }
    public <T> InquiryResponse aggregate(List<Message> serviceResponses) {
       your aggregatation code.....
    }
}
0
votes

Please, share with us what is the ReleaseStrategy for your use-case.

By default an aggregator tries to group messages by the correlationId header. And decides when it is time to release group by the sequenceSize header. If this condition isn't met, the group is not released and remains in the memory.

Consider to use @ServiceActivator instead on the AggregatorFactoryBean @Bean though.