0
votes

I am getting spurious correlation errors using TcpOutboundGateway with CachingClientConnectionFactory in a multithreaded context.

The log message is: 2015-05-26 14:50:38.406 ERROR 3320 --- [pool-2-thread-2] o.s.i.ip.tcp.TcpOutboundGateway : Cannot correlate response - no pending reply

I do not get the error when sending from a single thread, and I have tested and 2 physical machines - Windows 7 and Fedora 20. I am using Spring boot

It results in a timeout error for on the send that does not recieve its response.

Below is my simplified code: Note it does not always produce the error - it is spurious The code Uses a TcpOutboundGateway and TcpInboundGateway, but in my actual application the server is legacy (not Spring) Java code, so I use CachingClientConnectionFactory to enhance performance

@Configuration
@ComponentScan
@EnableAutoConfiguration
public class Test {

    //**************** Client **********************************************
    @Bean
    public MessageChannel replyChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel sendChannel() {
        MessageChannel directChannel = new DirectChannel();
        return directChannel;
    }

    @Bean
    AbstractClientConnectionFactory tcpNetClientConnectionFactory() {
        AbstractClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory("localhost", 9003);
        CachingClientConnectionFactory cachingClientConnectionFactory = new CachingClientConnectionFactory(tcpNetClientConnectionFactory, 4);
        return cachingClientConnectionFactory;
    }

    @Bean
    @ServiceActivator(inputChannel = "sendChannel")
    TcpOutboundGateway tcpOutboundGateway() {
        TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
        tcpOutboundGateway.setConnectionFactory(tcpNetClientConnectionFactory());
        tcpOutboundGateway.setReplyChannel(replyChannel());

        return tcpOutboundGateway;
    }
    //******************************************************************


    //**************** Server **********************************************
    @Bean
    public MessageChannel receiveChannel() {
        return new DirectChannel();
    }

    @Bean
    TcpNetServerConnectionFactory tcpNetServerConnectionFactory() {
        TcpNetServerConnectionFactory tcpNetServerConnectionFactory =  new TcpNetServerConnectionFactory(9003);
        tcpNetServerConnectionFactory.setSingleUse(false);
        return tcpNetServerConnectionFactory;
    }

    @Bean
    TcpInboundGateway tcpInboundGateway() {
        TcpInboundGateway tcpInboundGateway = new TcpInboundGateway();
        tcpInboundGateway.setConnectionFactory(tcpNetServerConnectionFactory());
        tcpInboundGateway.setRequestChannel(receiveChannel());
        return tcpInboundGateway;
    }
    //******************************************************************

    @Bean
    @Scope("prototype")
    Worker worker() {
        return new Worker();
    }

    public volatile static int lc = 4;
    public volatile static int counter = lc;
    public volatile static long totStartTime = 0;
    public volatile static int messageCount = 0;

    public static synchronized int incMessageCount(){
        return ++messageCount;
    }


    public static void main(String args[]) {
        //new LegaServer();
        ConfigurableApplicationContext applicationContext = SpringApplication.run(Test.class, args);
        totStartTime = System.currentTimeMillis();

        for (int z = 0; z < lc; z++) {
            new Thread((Worker) applicationContext.getBean("worker")).start();
        }

        try {
            Thread.currentThread().sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        applicationContext.stop();

    }
}


@MessageEndpoint
class RequestHandler {

    @ServiceActivator(inputChannel = "receiveChannel")
    public String rxHandler(byte [] in) {
        String s = new String(in);
        System.out.println("rxHandler:"+s);
        return "Blah blah " + s;
    }

}

@MessageEndpoint
class ResponseHandler {

    @ServiceActivator(inputChannel = "replyChannel")
    public void replyHandler(byte [] in) {
        System.out.println("replyHandler:"+new String(in));
    }

}

class Worker implements Runnable {

    @Autowired
    @Qualifier("sendChannel")
    MessageChannel dc;

    @Override
    public void run() {
        Test.counter--;
        int locMessageCount=0;
        long startTime = System.currentTimeMillis();
        for (int t = 0; t < 20; t++) {

            locMessageCount = Test.incMessageCount();

            Map hs = new HashMap<String, String>();
            hs.put("context", new Integer(Test.counter));

            GenericMessage message = new GenericMessage("this is a test message " + locMessageCount, hs);

            try {
                boolean sent = dc.send(message);
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("locMessageCount:"+locMessageCount);
            }

        }

        if (locMessageCount == (Test.lc*20)) {
            long totfinTime = System.currentTimeMillis();
            System.out.println("Tot. Time taken: " + (totfinTime - Test.totStartTime));
            System.out.println("Tot. TPS: " + (1000 * 20* Test.lc) / (totfinTime - Test.totStartTime));
            System.out.println("Tot. messages: " + Test.messageCount);
        }

    }
}

Any suggestions would be greatly appreciated, as is the assistance I have received so far. TY

1
Can you reproduce this with TRACE level logging enabled? If so, can you post the log someplace? (probably too big to post here). - Gary Russell
Thanks Gary, I uploaded the log file to my google drive. drive.google.com/file/d/0BzzeZSXQpeQCT0JjdUduSkpfYlE/… In this case the message response number that cant be correlated is 77 (locMessageCount:77) - Russell Maytham

1 Answers

1
votes

Thanks; this is a bug with the combo of the outbound gateway and caching connection factory; please open a JIRA Issue.

The problem is that the connection is added back to the pool (and reused) before the first thread (Thread-5) removes the pending reply; he ends up removing the new pending reply (for Thread-2) instead of his own.

Unfortunately, I don't have a simple work-around for you; it needs code changes in the gateway to fix it.