3
votes

I am using a publish-subscribe channel after the inbound gateway receives a String message to parallely send it to the logger to log the message and to a transformer to transform the message. I want both these acivities to happen in parallel.

My question is very simple - Does the publish subscribe channel in spring integration sends messages to it's subscibers parallely?

Below is the code snippet from the source of spring-integration-context.xml.

<int:gateway id="gateway" service-interface="com.test.Gateway">
    </int:gateway>
<int:publish-subscribe-channel id="publishsubscribechannel" />

<int:service-activator input-channel="publishsubscribechannel"
        method="transformEvent" ref="transformer" output-channel="transformerreplychannel">
</int:service-activator>
<int:service-activator input-channel="publishsubscribechannel"
        method="logMessage" ref="logger">
</int:service-activator>

Here the transformer and the logger are 2 subscribers to the publishsubscribechannel. In this setup will the message flow to logger and transformer from the gateway happen asynchronously by default??...OR I need to do some other configuration to achieve the same.

3

3 Answers

7
votes

... or using JavaConfig

@Bean
public MessageChannel publishsubscribechannel() {

return new PublishSubscribeChannel(executor());
} 

@Bean
public ThreadPoolTaskExecutor executor() {

ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(10);
pool.setMaxPoolSize(10);
pool.setWaitForTasksToCompleteOnShutdown(true);
return pool;
}
6
votes

By default, it runs in sequence. So in your case, it will be transformer, then logger. If you want to run it in parallel, you need to specify task-executor

<int:publish-subscribe-channel id="publishsubscribechannel" task-executor="executor" />
...
<task:executor id="executor" pool-size="10" />

And by using task-executor, the message handling is performed asynchronously.

5
votes

As is from the Spring Integration documentation

PublishSubscribeChannel

The PublishSubscribeChannel implementation broadcasts any Message sent to it to all of its subscribed handlers. This is most often used for sending Event Messages whose primary role is notification as opposed to Document Messages which are generally intended to be processed by a single handler. Note that the PublishSubscribeChannel is intended for sending only. Since it broadcasts to its subscribers directly when its send(Message) method is invoked, consumers cannot poll for Messages (it does not implement PollableChannel and therefore has no receive() method). Instead, any subscriber must be a MessageHandler itself, and the subscriber's handleMessage(Message) method will be invoked in turn.

Prior to version 3.0, invoking the send method on a PublishSubscribeChannel that had no subscribers returned false. When used in conjunction with a MessagingTemplate, a MessageDeliveryException was thrown. Starting with version 3.0, the behavior has changed such that a send is always considered successful if at least the minimum subscribers are present (and successfully handle the message). This behavior can be modified by setting the minSubscribers property, which defaults to 0.

[Note] Note If a TaskExecutor is used, only the presence of the correct number of subscribers is used for this determination, because the actual handling of the message is performed asynchronously.

Please note the Note. It does mention if the TaskExecutor is used, the message handling would be asynchronous.

So, yes you have to add a TaskExecutor for this to be asynchronous.