I'd like to write incoming messages into a message queue and have the messages consumed by a single, dedicated thread without a delay - very similar to Spring Integration listen on queue without poller
I tried:
IntegrationFlows
.from("inbound")
.channel(MessageChannels.queue(10_000))
.bridge(spec -> spec.poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
.fixedSubscriberChannel()
.route(inboundRouter())
.get()
But this causes a task scheduler thread to schedule the polling operation, which will then block until a message is available. This is not my idea of a "dedicated thread" and causes a deadlock in my application if the task scheduler threads are also being used to write into the queue, and then there is no consumer thread left on the other side.
Next thing I tried was:
IntegrationFlows
.from("inbound")
.channel(MessageChannels.queue(10_000))
.bridge(spec -> spec.poller(Pollers.fixedDelay(0).taskExecutor(Executors.newSingleThreadExecutor()).receiveTimeout(Long.MAX_VALUE)))
.fixedSubscriberChannel()
.route(inboundRouter())
.get()
But this caused the application to spawn a gazillion of scheduled tasks, because of fixedDelay(0).
Next option I came across was:
IntegrationFlows
.from("inbound")
.channel(MessageChannels.executor(Executors.newSingleThreadExecutor()))
.route(inboundRouter())
.get()
This seems to work as intended; I have a dedicated thread that processes all messages. However, I do no longer have a message queue whose statistics I can monitor (via JMX).
So, is there any way to achieve my goal, and how?