2
votes

I have a simple spring integration app, where I'm attempting to publish a task to a queue-channel, and then have a worker pick up the task and execute it. (from a pool with multiple concurrent workers available).

I'm finding that the thread pool is quickly exhausted, and tasks are rejected.

Here's my config:

<int:annotation-config />
<task:annotation-driven executor="executor" scheduler="scheduler"/>
<task:executor id="executor" pool-size="5-20" rejection-policy="CALLER_RUNS" />
<task:scheduler id="scheduler" pool-size="5"/>


<int:gateway service-interface="com.example.MyGateway">
    <int:method name="queueForSync" request-channel="worker.channel" />
</int:gateway>
<int:channel id="worker.channel">
    <int:queue />
</int:channel>

<bean class="com.example.WorkerBean" id="workerBean" />
<int:service-activator ref="workerBean" method="doWork" input-channel="worker.channel">
    <int:poller fixed-delay="50" task-executor="executor" receive-timeout="0" />
</int:service-activator>

This question is very similar to another I asked a while back, here. The main difference is that I'm not using an AMQP message broker here, just internal spring message channels.

I haven't been able to find an an analogy for the concurrent-consumer concept in vanilla spring channels.

Moreover, I've adopted Gary Russell's suggested config:

To avoid this, simply set the receive-timeout to 0 on the <poller/>

Despite that, I'm still getting the pool exhausted.

What's the correct configuration for this goal?

As an aside - two other smells here suggest that my config is wrong:

  • Why am I getting rejected exceptions when the rejection-policy is CALLER_RUNS?
  • The exceptions start occurring when the queued tasks = 1000. Given there's no queue-capacity on the executor, shouldn't the queue be unbounded?

Exception stack trace shown:

[Mon Dec 2013 17:44:57.172] ERROR [task-scheduler-6] (org.springframework.integration.handler.LoggingHandler:126) - org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@48e83911[Running, pool size = 20, active threads = 20, queued tasks = 1000, completed tasks = 48]] did not accept task: org.springframework.integration.util.ErrorHandlingTaskExecutor$1@a5798e
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:244)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:231)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.integration.util.ErrorHandlingTaskExecutor$1@a5798e rejected from java.util.concurrent.ThreadPoolExecutor@48e83911[Running, pool size = 20, active threads = 20, queued tasks = 1000, completed tasks = 48]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:241)
    ... 11 more
1
Downvoter - care to comment why? - Marty Pitt

1 Answers

0
votes

Best guess is you have another executor bean somewhere else in the context.

Turn on debug logging and look for ...DefaultListableBeanFactory] Overriding bean definition for bean 'executor'.

The default queue capacity is Integer.MAX_VALUE.