1
votes

We have a small spring-integration channel system and which introduce a memory leak on high load on the application. We created this sample application solely to test the spring aggregator.

SI Applicatiton Sample Git Project

Heap Dumps on both scenarios

In Messenger class we control the amount of time which the submission thread sleeps after 1000 messages passed to a channel before it stars submitting next 1000.

    public void run() {
    int correlationId = 0;
    while (true) {

        for (int sequenceNumber = 0; sequenceNumber < sequenceSize; sequenceNumber++) {
            Job jp = new Job(name);
            Message<Job> message2 = MessageBuilder.withPayload(jp)
           .setSequenceNumber(sequenceNumber)
           .setSequenceSize(sequenceSize)
           .setCorrelationId(correlationId).build();

           channel.send(message2);
        }

        try {
            Thread.sleep(100, 0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        correlationId++;
        if (correlationId >= CORRELATION_ID_MAX) {
            correlationId = 0;
        }
    }

We believe that in normal operation when it does not hog the memory TreeMap$Entry takes lot of heap memory having lot of instances, but when reduce sleep time to a very low amount HashMap$Node starts to become a issue

When low load no memory leak When low load no memory leak

When high load memory leak When high load memory leak

HashMap$Node will keep increase its memory utilization forever.

We are printing the list items delay at the terminal from channel messageOutChannel after receiving aggregated message list from the aggregator.

Time :   647 ms  List size : 1000   Hash : 875023460    By thread : th0  -FP

On memory leak situation Time will keep increasing and when not in a memory leak situation it will become stable around a number.

Hope someone can shed some light on this and show what we are doing wrong here ? Thank you

1
Use a different MessageStore by default all messages are kept in memory until aggregation is complete. With a lot of messages you probably don't want this. By default the SimpleMessageStore is used which uses a Map to store the messages.M. Deinum

1 Answers

1
votes

This has nothing to do with the aggregator; the memory "leak" is simply because you are queuing up gazillions of tasks in the task executor and you have no limit on its queue size.

enter image description here