1
votes

We have created an IntegrationFlow for an inbound TCP server. Whenever we test this the integration is fine until the client disconnects, then we get a continuous flow of empty messages.

Spring Integration 5.2.5.RELEASE

Flow:

@Bean
public IntegrationFlow inboundFlow(MyService myService) {
   return IntegrationFlows.from(
       Tcp.inboundAdapter(
           Tcp.netServer(12345)
               .deserializer(TcpCodecs.lengthHeader4())))
       .transform(Transformers.objectToString())
       .log(INFO) // Logging only for testing 
       .handle(myService, "process") // Do some processing of the message
       .get();
}

Test method:

public void sendPackets() throws Exception {
    List<Path> files = getAllFilesSorted("/some/location");
    try (Socket socket = new Socket("localhost", 12345)) {
        log.info("local port: {}", socket.getLocalPort());
        try (OutputStream outputStream = socket.getOutputStream()) {
            for (int i = 0; i < 10; i++) {
                Path path = files.get(i);
                log.info("{} ({} of {})", path.toString(), i, files.size());
                byte[] bytes = Files.readAllBytes(path);
                outputStream.write(bytes);
                outputStream.flush();
                Thread.sleep(200);
            }
        }
    }
}

Partial Log after test completes:

2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=813ccf6b-9eb7-2ff3-0f8d-f2810966d8d7, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=1fd5c4c7-b2e0-6a89-7de6-894c8e42e242, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=78b852e8-0094-f8b9-8d05-37bc4912d096, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=4e4a7ee0-c66e-9501-9ef7-4d25143531b3, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=c90abe17-7036-94cc-af59-3b0cac5e75e4, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=425d2e2a-75c5-0184-06da-6d27eb546931, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=4c55d1d0-7421-1832-23ca-03744419c847, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=05b6fea6-8f76-5c74-e4cc-33bd8099d8e8, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=9b8fe16c-07f4-b64b-faec-794eb743559c, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=222725e8-19c6-62b6-8ba7-103892191c96, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=951403b6-18a9-ffe9-29b7-67b8c01ea311, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=2ac3002b-ccd1-9506-f3aa-60f4b63e26b8, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=dbbf12cf-0a27-407b-0a96-5b27ab0539db, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=3282d09e-9508-b053-3763-cc23d3c817a8, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.966  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=549b2326-341f-19c0-db98-6b31da1901d8, ip_hostname=localhost, timestamp=1588672852966}]
1
I don't see how that's possible; enable DEBUG logging for org.springframework.integration. If that doesn't explain what's happening; post the log someplace (like pastebin) and comment here that you have done so. You should also edit the question to show what version of Spring Integration you are using. - Gary Russell
@GaryRussell Uploaded log with DEBUG enabled to pastebin.com/Y6MRpyTb - you can see on line 137 the first empty message. I don't see anything around this that might indicate what is causing this. - hawkeye-73
com.hawkeyeinnovations.tennis.scoringprovider.config.SmartDirectorTCPIT$TestClientConfig'; from source: 'bean method client'. Looks like you have some client config for that test environment. Probably it connects to the same server... - Artem Bilan
@ArtemBilan Yes, I forgot to remove that client. Ran again with it removed and behaviour is the same. If I extract the test sendPackets method to it's own java app I get the same behaviour (been running in a @SpringBootTest for expediency) - hawkeye-73
@ArtemBilan Created a simple project - github.com/marksilcox/SO61610809 - hawkeye-73

1 Answers

1
votes

There is nothing in the log about the client disconnecting. We would see socket close activity in the log.

The log implies we are receiving packets containing 0x00000000.

Use wireshark or similar to look at the activity on the wire.

EDIT

Your custom deserializer is causing the problem.

        if (read < 0) {
            return 0;
        }

You need to throw a SoftEndOfStreamException when you detect a socket close.

This is explained in the documentation.

When the deserializer detects a closed input stream between messages, it must throw a SoftEndOfStreamException; this is a signal to the framework to indicate that the close was "normal". If the stream is closed while decoding a message, some other exception should be thrown instead.