4
votes

I am using

Spring Integration 4.1.2.RELEASE

Spring AMQP 1.4.3.RELEASE

I have an AMQP inbound channel adapter and a ThreadPoolTaskExecutor configured like this:

<task:executor id="exec.newItems" pool-size="5" />
<int-amqp:inbound-channel-adapter
    connection-factory="amqpConnectionFactory" auto-startup="true" 
    queue-names="#{newItemsQueueName}"
    channel="newItems.payloadType.routingChannel"
    message-converter="jsonMessageConverter"
    acknowledge-mode="AUTO" error-channel="errorChannel"
    concurrent-consumers="5"
    mapped-request-headers="*"
    channel-transacted="false"
    task-executor="exec.newItems"
    />

I would like to know what the relationship is between the value set for concurrent-consumers in the AMQP inbound channel adapter and the pool-size in the task executor configuration.

Here is what I observed by using the JVM Monitor plugin in Eclipse.

  • If concurrent-consumers is greater than pool-size and pool-size is x, then x threads are created but they are in the blocked state and messages are not processed.

  • If concurrent-consumers is equal to pool-size and pool-size is x, then x threads are created and messages are processed.

  • If concurrent-consumers is less than pool-size and concurrent-consumers is y, then y threads are created and messages are processed.

I am thinking that concurrent-consumers might be setting the maximum pool size on the executor. Is this an accurate observation?

1

1 Answers

3
votes

I'd say that your investigation is correct.

Consumers are long-living task with while(true) in their Runnable.run implementation, hence each of them acquire one Thread from that executor forever (until its death, of course).

The default Executor is SimpleAsyncTaskExecutor in the SimpleMessageListenerContainer. That means that any new consumer gets it own thread and it doesn't cause blocking issue.

With ThreadPoolTaskExecutor we really can end up with an issue, when there is no enough threads in the pool for us. And some our consumers won't do their work. The more it may be bad when we share that taskExecutor between different components.

It isn't bad idea to use managed executor in the application (especially in AS environment), even for this kind of long-living task, but we really should be sure that we have optimal configuration for concurrency.