0
votes

As a result of Spring integration MessageQueue without polling, I have a poller that consumes messages from a queue instantly, using a custom TaskScheduler:

ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultProcessor-");

IntegrationFlows
  .from("inbound")
  .channel(MessageChannels.priority().get())
  .bridge(bridge -> bridge
    .taskScheduler(taskScheduler)
    .poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
  .fixedSubscriberChannel()
  .route(inboundRouter())
  .get()

Now I'd like to have multiple threads consuming concurrently, so I tried:

ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultProcessor-");
scheduler.setPoolSize(4);

However, since in AbstractPollingEndpoint the task scheduler schedules a synchronous poller (it's a bit complicated), only 1 thread is created. If I set the TaskExecutor to anything but SyncTaskExecutor (default), I run into a flood of scheduled tasks (see Spring integration MessageQueue without polling).

How can I concurrently consume from a queue in Spring Integration? This seems pretty basic but I couldn't find a solution.

Instead of the queue I could use an ExecutorChannel, however, (AFAIK) I then lose queue features like priority, queue size, and metrics on which I rely.

2

2 Answers

1
votes

See PollerSpec.taskExecutor():

/**
 * Specify an {@link Executor} to perform the {@code pollingTask}.
 * @param taskExecutor the {@link Executor} to use.
 * @return the spec.
 */
public PollerSpec taskExecutor(Executor taskExecutor) {

This way after scheduling the task periodically according your taskScheduler and delay, the real task is performed on a thread from that provided executor. By default it indeed performs the task on a scheduler's thread.

UPDATE

I'm not sure if this meets your requirements, but this is only way to keep your queue logic and process whatever is pull in parallel:

 .bridge(bridge -> bridge
    .taskScheduler(taskScheduler)
    .poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
 .channel(channels -> channel.executor(threadPoolExecutor()))   
 .fixedSubscriberChannel()
1
votes

I was able to solve it like this:

  • A single-threaded task scheduler that performs the polling
  • A thread pool executor with a synchronous queue

This way, the task scheduler can give each executor 1 task and blocks when no executor is free, thus not draining the source queue or spamming tasks.

  @Bean
  public IntegrationFlow extractTaskResultFlow() {
    return IntegrationFlows
      .from(ChannelNames.TASK_RESULT_QUEUE)
      .bridge(bridge -> bridge
        .taskScheduler(taskResultTaskScheduler())
        .poller(Pollers
          .fixedDelay(0)
          .taskExecutor(taskResultExecutor())
          .receiveTimeout(Long.MAX_VALUE)))
      .handle(resultProcessor)
      .channel(ChannelNames.TASK_FINALIZER_CHANNEL)
      .get();
  }

  @Bean
  public TaskExecutor taskResultExecutor() {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
      1, // corePoolSize
      8, // maximumPoolSize
      1L, // keepAliveTime
      TimeUnit.MINUTES,
      new SynchronousQueue<>(),
      new CustomizableThreadFactory("resultProcessor-")
    );
    executor.setRejectedExecutionHandler(new CallerBlocksPolicy(Long.MAX_VALUE));
    return new ErrorHandlingTaskExecutor(executor, errorHandler);
  }

  @Bean
  public TaskScheduler taskResultTaskScheduler() {
    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
    scheduler.setThreadNamePrefix("resultPoller-");
    return scheduler;
  }

(The initial example was copied from the linked question, this one now resembles my actual solution)