I'm using Spring Integration to build the following flow: Input Channel -> Splitter -> Transformer -> Service Activator -> Aggregator
Transformer and Service Activator are chained and executed using task-executor. During application execution, there are no issues. But when I try to run unit tests, the executor thread that's executing the Service Activator quits mysteriously if there's a long running task. To demonstrate this I have created a sample project with following configuration:
<task:executor id="executor" pool-size="20" keep-alive="120" queue-capacity="100"/>
<jms:message-driven-channel-adapter id="helloWorldJMSAdapater" destination="helloWorldJMSQueue"
channel="helloWorldChannel"/>
<int:channel id="helloWorldChannel"/>
<int:splitter id="splitter" input-channel="helloWorldChannel" output-channel="execChannel">
<bean id="stringSplitter" class="hello.Splitter"></bean>
</int:splitter>
<int:channel id="execChannel">
<int:dispatcher task-executor="executor"></int:dispatcher>
</int:channel>
<int:chain input-channel="execChannel" output-channel="aggregatorChannel">
<int:transformer>
<bean id="stringTransformer" class="hello.Transformer"></bean>
</int:transformer>
<int:service-activator id="helloWorldServiceActivator" ref="helloWorldAmqService" method="processMsg"/>
</int:chain>
<int:aggregator input-channel="aggregatorChannel" output-channel="errorChannel">
<bean class="hello.ResponseAggregator"/>
</int:aggregator>
This is the Splitter class:
public class Splitter {
public List<String> splitMessage(Message message) {
String msg = message.getPayload().toString();
return Arrays.asList(msg.split(","));
}
}
This is the Transformer class:
public class Transformer {
public String transform(Message message) {
String msg = message.getPayload().toString();
return msg+"t";
}
}
This is the Service Activator class:
@Service
public class HelloWorldAmqService {
public Message processMsg(String msg) throws InterruptedException {
DateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date date = new Date();
//Simulate long running process
if(msg.equalsIgnoreCase("1t")){
Thread.sleep(500);
System.out.println("After first sleep");
Thread.sleep(800);
}
System.out.println("*************"+ msg + " as of "+sdf.format(date)+" *********** " );
return MessageBuilder.withPayload(msg).build();
}
}
To simulate long running task, I've added a Thread.sleep() in the processMsg method.
This is the ResponseAggregator class:
@Component
public class ResponseAggregator extends AbstractAggregatingMessageGroupProcessor {
@Override
protected Message aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
StringBuilder builder = new StringBuilder();
for (Message message : group.getMessages()) {
builder.append(message.getPayload());
}
System.out.println(builder.toString());
return MessageBuilder.withPayload(builder.toString()).build();
}
}
I wrote a Unit Test to send a sample message to the channel and test the behaviour. But whenever the Service Activator thread takes more than approx 1000ms to process, the thread quits without any warning. Here's the Unit Test
@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsFlowTests {
@Autowired
private MessageChannel helloWorldChannel;
@Autowired
private HelloWorldAmqService hello;
@Autowired
@Qualifier("jmsConnectionFactory")
ConnectionFactory jmsConnectionFactory;
@Test
public void test() {
helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());
assertThat(true);
}
}
One would normally expect the output to have the aggregated results i.e., 1t2t3t4t6t (The order could be different). But the application doesn't reach the aggregator at all. The thread that's responsible for processing "1t" quits and the aggregator doesn't get triggered at all as the default behaviour is to wait until all the messages arrive.
This is the response if I let the Thread sleep longer i.e., Thread.sleep(1000)
*************2t as of 2019/10/14 17:29:58 ***********
*************3t as of 2019/10/14 17:29:58 ***********
*************4t as of 2019/10/14 17:29:58 ***********
*************6t as of 2019/10/14 17:29:58 ***********
After first sleep
This is the response if I let the thread sleep for a shorter duration i.e., Thread.sleep(200)
*************2t as of 2019/10/14 17:31:53 ***********
*************4t as of 2019/10/14 17:31:53 ***********
*************6t as of 2019/10/14 17:31:53 ***********
*************3t as of 2019/10/14 17:31:53 ***********
After first sleep
*************1t as of 2019/10/14 17:31:53 ***********
2t3t6t4t1t