We have a small spring-integration channel system and which introduce a memory leak on high load on the application. We created this sample application solely to test the spring aggregator.
SI Applicatiton Sample Git Project
In Messenger class we control the amount of time which the submission thread sleeps after 1000 messages passed to a channel before it stars submitting next 1000.
public void run() {
int correlationId = 0;
while (true) {
for (int sequenceNumber = 0; sequenceNumber < sequenceSize; sequenceNumber++) {
Job jp = new Job(name);
Message<Job> message2 = MessageBuilder.withPayload(jp)
.setSequenceNumber(sequenceNumber)
.setSequenceSize(sequenceSize)
.setCorrelationId(correlationId).build();
channel.send(message2);
}
try {
Thread.sleep(100, 0);
} catch (InterruptedException e) {
e.printStackTrace();
}
correlationId++;
if (correlationId >= CORRELATION_ID_MAX) {
correlationId = 0;
}
}
We believe that in normal operation when it does not hog the memory TreeMap$Entry takes lot of heap memory having lot of instances, but when reduce sleep time to a very low amount HashMap$Node starts to become a issue
HashMap$Node will keep increase its memory utilization forever.
We are printing the list items delay at the terminal from channel messageOutChannel after receiving aggregated message list from the aggregator.
Time : 647 ms List size : 1000 Hash : 875023460 By thread : th0 -FP
On memory leak situation Time will keep increasing and when not in a memory leak situation it will become stable around a number.
Hope someone can shed some light on this and show what we are doing wrong here ? Thank you
MessageStore
by default all messages are kept in memory until aggregation is complete. With a lot of messages you probably don't want this. By default theSimpleMessageStore
is used which uses aMap
to store the messages. – M. Deinum