As a result of Spring integration MessageQueue without polling, I have a poller that consumes messages from a queue instantly, using a custom TaskScheduler:
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultProcessor-");
IntegrationFlows
.from("inbound")
.channel(MessageChannels.priority().get())
.bridge(bridge -> bridge
.taskScheduler(taskScheduler)
.poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
.fixedSubscriberChannel()
.route(inboundRouter())
.get()
Now I'd like to have multiple threads consuming concurrently, so I tried:
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultProcessor-");
scheduler.setPoolSize(4);
However, since in AbstractPollingEndpoint the task scheduler schedules a synchronous poller (it's a bit complicated), only 1 thread is created. If I set the TaskExecutor to anything but SyncTaskExecutor (default), I run into a flood of scheduled tasks (see Spring integration MessageQueue without polling).
How can I concurrently consume from a queue in Spring Integration? This seems pretty basic but I couldn't find a solution.
Instead of the queue I could use an ExecutorChannel, however, (AFAIK) I then lose queue features like priority, queue size, and metrics on which I rely.