0
votes

I'm using Spring Integration to build the following flow: Input Channel -> Splitter -> Transformer -> Service Activator -> Aggregator

Transformer and Service Activator are chained and executed using task-executor. During application execution, there are no issues. But when I try to run unit tests, the executor thread that's executing the Service Activator quits mysteriously if there's a long running task. To demonstrate this I have created a sample project with following configuration:

<task:executor id="executor" pool-size="20" keep-alive="120" queue-capacity="100"/>
<jms:message-driven-channel-adapter id="helloWorldJMSAdapater" destination="helloWorldJMSQueue"
    channel="helloWorldChannel"/>    
<int:channel id="helloWorldChannel"/>
<int:splitter id="splitter" input-channel="helloWorldChannel" output-channel="execChannel">
    <bean id="stringSplitter" class="hello.Splitter"></bean>
</int:splitter>
<int:channel id="execChannel">
    <int:dispatcher task-executor="executor"></int:dispatcher>
</int:channel>

<int:chain input-channel="execChannel" output-channel="aggregatorChannel">
    <int:transformer>
        <bean id="stringTransformer" class="hello.Transformer"></bean>
    </int:transformer>
    <int:service-activator id="helloWorldServiceActivator" ref="helloWorldAmqService" method="processMsg"/>
</int:chain>
<int:aggregator input-channel="aggregatorChannel" output-channel="errorChannel">
    <bean class="hello.ResponseAggregator"/>
</int:aggregator>

This is the Splitter class:

public class Splitter {

public List<String> splitMessage(Message message)  {
    String msg = message.getPayload().toString();
    return Arrays.asList(msg.split(","));
}

}

This is the Transformer class:

public class Transformer {

public String transform(Message message)  {
    String msg = message.getPayload().toString();
    return msg+"t";
}

}

This is the Service Activator class:


@Service
public class HelloWorldAmqService {

    public Message processMsg(String msg) throws InterruptedException {
        DateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        Date date = new Date();
        //Simulate long running process
        if(msg.equalsIgnoreCase("1t")){
            Thread.sleep(500);
            System.out.println("After first sleep");
            Thread.sleep(800);
        }
        System.out.println("*************"+ msg + " as of "+sdf.format(date)+" ***********  " );
        return MessageBuilder.withPayload(msg).build();

    }
}

To simulate long running task, I've added a Thread.sleep() in the processMsg method.

This is the ResponseAggregator class:

@Component
public class ResponseAggregator extends AbstractAggregatingMessageGroupProcessor {

    @Override
    protected Message aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
        StringBuilder builder = new StringBuilder();
        for (Message message : group.getMessages()) {
            builder.append(message.getPayload());
        }
        System.out.println(builder.toString());
        return MessageBuilder.withPayload(builder.toString()).build();
    }
}

I wrote a Unit Test to send a sample message to the channel and test the behaviour. But whenever the Service Activator thread takes more than approx 1000ms to process, the thread quits without any warning. Here's the Unit Test

@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsFlowTests {

    @Autowired
    private MessageChannel helloWorldChannel;

    @Autowired
    private HelloWorldAmqService hello;

    @Autowired
    @Qualifier("jmsConnectionFactory")
    ConnectionFactory jmsConnectionFactory;

    @Test
    public void test() {

        helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());
        assertThat(true);
    }

}

One would normally expect the output to have the aggregated results i.e., 1t2t3t4t6t (The order could be different). But the application doesn't reach the aggregator at all. The thread that's responsible for processing "1t" quits and the aggregator doesn't get triggered at all as the default behaviour is to wait until all the messages arrive.

This is the response if I let the Thread sleep longer i.e., Thread.sleep(1000)

*************2t as of 2019/10/14 17:29:58 ***********  
*************3t as of 2019/10/14 17:29:58 ***********  
*************4t as of 2019/10/14 17:29:58 ***********  
*************6t as of 2019/10/14 17:29:58 ***********  


After first sleep

This is the response if I let the thread sleep for a shorter duration i.e., Thread.sleep(200)

*************2t as of 2019/10/14 17:31:53 ***********  
*************4t as of 2019/10/14 17:31:53 ***********  
*************6t as of 2019/10/14 17:31:53 ***********  
*************3t as of 2019/10/14 17:31:53 ***********  


After first sleep
*************1t as of 2019/10/14 17:31:53 ***********  
2t3t6t4t1t
1
Threads can't just "die". Try turning on DEBUG logging and follow the messages through the flow. If you can't figure it out from that, post the log someplace like pastebin on a github gist. - Gary Russell

1 Answers

0
votes

Your problem that you are missing a fact that your application is an async, but your test has nothing to deal with multi-threaded solution.

You send a message in the test method and do nothing to wait for an output message. Therefore a main thread which initiated a test execution just exists leaving all the execution in other threads behind.

Your idea with a sending to the helloWorldChannel instead of dealing with a JMS destination is a good choice. Only the problem that you don't wait for a flow result after an aggregation.

It is weird, too, to have an output of the endpoint into an errorChannel, but you can subscribe into it from your test-case before producing message:

@Autowired
private SubscribableChannel errorChannel;


@Test
public void test() {
    SettableListenableFuture<Message<?>> messageFuture = new SettableListenableFuture<>();
    this.errorChannel.subscribe((message) -> messageFuture.set(message));
    helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());
    Message<?> messageToAssert = messageFuture.get(10, TimeUnit.SECONDS);
    ...
}

This way independently of the flow behavior your main JUnit thread is going to wait for a result in that Future.