0
votes

I know spring integration has TcpInboundGateway and ByteArrayStxEtxSerializer to handle data coming through TCP port.

ByteArrayStxEtxSerializer works great if the TCP server needs to read all the data sent from the client and then processes it. (request and response model) I am using single-use=false so that multiple requests can be processed in the same connection.

For example if the client sends 0x02AAPL0x03 then Server can send the AAPL price.

My TCP Server is working if the client sends 0x02AAPL0x030x02GOOG0x03. It sends the price of AAPL and GOOG price.

Sometimes clients can send EOT (0x04). If the client sends EOT, I would like to close the socket connection.

For example: Client request can be 0x02AAPL0x030x02GOOG0x03 0x020x040x03. Note EOT came in the last packet.

I know ByteArrayStxEtxSerializer deserializer can be customized to read the bytes sent by the client.

is deserializer good place to close socket connection? if not, how should spring integration framework be notified to close socket connection?

Please help.

Here is my spring configuration:

<int-ip:tcp-connection-factory id="crLfServer"
        type="server"
        port="${availableServerSocket}"
        single-use="false"
        so-timeout="10000"
        using-nio="false" 
        serializer="connectionSerializeDeserialize"
        deserializer="connectionSerializeDeserialize"
        so-linger="2000"/>

    <bean id="connectionSerializeDeserialize" class="org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer"/>

    <int-ip:tcp-inbound-gateway id="gatewayCrLf"
        connection-factory="crLfServer"
        request-channel="serverBytes2StringChannel"
        error-channel="errorChannel"
        reply-timeout="10000"/> <!-- reply-timeout works on inbound-gateway -->

    <int:channel id="toSA" />

    <int:service-activator input-channel="toSA"
        ref="myService"
        method="prepare"/>

    <int:object-to-string-transformer id="serverBytes2String"
        input-channel="serverBytes2StringChannel"
        output-channel="toSA"/>

    <int:transformer id="errorHandler"
        input-channel="errorChannel"
        expression="payload.failedMessage.payload + ':' + payload.cause.message"/>

UPDATE: Adding throw new SoftEndOfStreamException("Stream closed") to close the stream in serializer works and I can see the CLOSED log entry in EventListener. When the server closes the connection, I expect to receive java.io.InputStream.read() as -1 in the client. But the client is receiving the

java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:129)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:264)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:306)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:158)
    at sun.nio.cs.StreamDecoder.read0(StreamDecoder.java:107)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:93)
    at java.io.InputStreamReader.read(InputStreamReader.java:151)

is there anything else to close the connection on the server side and propagate it to client?

I appreciate your help.

Thank you

1

1 Answers

1
votes

The deserializer doesn't have access to the socket, just the input stream; closing it would probably work, but you will likely get a lot of noise in the log.

The best solution is to throw a SoftEndOfStreamException; that signals that the socket should be closed and everything cleaned up.

EDIT

Add a listener to detect/log the close...

@SpringBootApplication
public class So40471456Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So40471456Application.class, args);
        Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
        socket.getOutputStream().write("foo\r\n".getBytes());
        socket.close();
        Thread.sleep(10000);
        context.close();
    }

    @Bean
    public EventListener eventListener() {
        return new EventListener();
    }

    @Bean
    public TcpNetServerConnectionFactory server() {
        return new TcpNetServerConnectionFactory(1234);
    }

    @Bean
    public TcpReceivingChannelAdapter inbound() {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(server());
        adapter.setOutputChannelName("foo");
        return adapter;
    }

    @ServiceActivator(inputChannel = "foo")
    public void syso(byte[] in) {
        System.out.println(new String(in));
    }

    public static class EventListener implements ApplicationListener<TcpConnectionCloseEvent> {

        private final Log logger = LogFactory.getLog(getClass());

        @Override
        public void onApplicationEvent(TcpConnectionCloseEvent event) {
            logger.info(event);
        }

    }

}

With XML, just add a <bean/> for your listener class.

Result:

foo
2016-11-07 16:52:04.133  INFO 29536 --- [pool-1-thread-2] c.e.So40471456Application$EventListener  : TcpConnectionCloseEvent 
[source=org.springframework.integration.ip.tcp.connection.TcpNetConnection@118a7548], 
[factory=server, connectionId=localhost:50347:1234:b9fcfaa9-e92c-487f-be59-1ed7ebd9312e] 
**CLOSED**

EDIT2

It worked as expected for me...

@SpringBootApplication
public class So40471456Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So40471456Application.class, args);
        Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
        socket.getOutputStream().write("foo\r\n".getBytes());
        try {
            System.out.println("\n\n\n" + socket.getInputStream().read() + "\n\n\n");
            context.getBean(EventListener.class).latch.await(10, TimeUnit.SECONDS);
        }
        finally {
            socket.close();
            context.close();
        }
    }

    @Bean
    public EventListener eventListener() {
        return new EventListener();
    }

    @Bean
    public TcpNetServerConnectionFactory server() {
        TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(1234);
        server.setDeserializer(is -> {
            throw new SoftEndOfStreamException();
        });
        return server;
    }

    @Bean
    public TcpReceivingChannelAdapter inbound() {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(server());
        adapter.setOutputChannelName("foo");
        return adapter;
    }

    public static class EventListener implements ApplicationListener<TcpConnectionCloseEvent> {

        private final Log logger = LogFactory.getLog(getClass());

        private final CountDownLatch latch = new CountDownLatch(1);

        @Override
        public void onApplicationEvent(TcpConnectionCloseEvent event) {
            logger.info(event);
            latch.countDown();
        }

    }

}

Result:

2016-11-08 08:27:25.964  INFO 86147 --- [           main] com.example2.So40471456Application       : Started So40471456Application in 1.195 seconds (JVM running for 1.764)



-1



2016-11-08 08:27:25.972  INFO 86147 --- [pool-1-thread-2] c.e.So40471456Application$EventListener  : TcpConnectionCloseEvent [source=org.springframework.integration.ip.tcp.connection.TcpNetConnection@fee3774], [factory=server, connectionId=localhost:54984:1234:f79a6826-0336-4823-8844-67054903a094] **CLOSED**