0
votes

Well, I have a Spring integrated TCP client. I am trying to connect to remote TCP server and receive data from the socket which is asynchronously written by Server.

But , it so happens that, my client is receiving the first message and not going further with receiving further messages from server socket.(Actually looking at server log you can make out client has lost connection, but why?)

And, one more thing is how can I trigger certain functionality the moment I receive the message? - may be TcpConnectionHandler-handleMessage() or TcpLisetner - onMessage().

Eventually, I would like to have - a Spring TCP client, who connects to remote server and receive data, as it comes on to the socket. Below are my configurations and code:

my config:

<bean id="javaSerializer" class="com.my.client.CustomSerializerDeserializer" />
<bean id="javaDeserializer" class="com.my.client.CustomSerializerDeserializer" />

<context:property-placeholder />

<!-- Client side -->

<int:gateway id="gw" service-interface="com.zebra.client.SimpleGateway" default-request-channel="input" default-reply-channel="replies" />

<int-ip:tcp-connection-factory id="client"
    type="client" host="localhost" port="5678" single-use="false"
    so-timeout="100000" serializer="javaSerializer" deserializer="javaDeserializer"
    so-keep-alive="true" />

<int:channel id="input" />

<int:channel id="replies">
    <int:queue />
</int:channel>
<int-ip:tcp-outbound-channel-adapter
    id="outboundClient" channel="input" connection-factory="client" />

<int-ip:tcp-inbound-channel-adapter
    id="inboundClient" channel="replies" connection-factory="client"
    client-mode="true" auto-startup="true" />

My Tcp server:

while(true)
  {
     try
     {
        System.out.println("Waiting for client on port " +
        serverSocket.getLocalPort() + "...");

        Socket server = serverSocket.accept();
        System.out.println("Just connected to "
              + server.getRemoteSocketAddress());

        DataOutputStream out =
             new DataOutputStream(server.getOutputStream());
        out.write("ACK\r\n".getBytes());

        out.flush();

       //server.close();

     }catch(SocketTimeoutException s)
     {
        System.out.println("Socket timed out!");
        break;
     }catch(IOException e)
     {
        e.printStackTrace();
        break;
     } 
  }

Server log:

Waiting for client on port 5678... Just connected to /127.0.0.1:56108 Waiting for client on port 5678...

My TcpClient:

    final GenericXmlApplicationContext context = new GenericXmlApplicationContext();

    context.load("classpath:config.xml");

    context.registerShutdownHook();
    context.refresh();

    final SimpleGateway gateway = context.getBean(SimpleGateway.class);
    int i=0;

    while(i++<10){

    String h = gateway.receive();
    System.out.println("Received message "+h);

    }try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

My client log:

Received message ACK

Received message Received message Received message Received message Received message Received message Received message Received message Received message

My custom deserializer:

@Override
public String deserialize(InputStream inputStream) throws IOException {
    // TODO Auto-generated method stub
    StringBuilder builder = new StringBuilder();


    int size = inputStream.available();

    int c;
    for (int i = 0; i < size; ++i) {
        c = inputStream.read();

        if(c!=-1){
        builder.append((char)c);
        }
        else{
            break;
        }
    }


    return builder.toString();
}

my gateway:

public interface SimpleGateway {

    public String receive();

}

Please let me know if you have more questions.

2

2 Answers

1
votes

Your server only sends one message and then accepts a new connection.

EDIT

I trigger certain functionality the moment I receive the message?

I don't really understand that part of the question - you currently have the input channel wired to an outbound channel adapter which simply writes it back out again.

You can do something like this...

<int:object-to-string-transformer input-channel="input" output-channel="next" />

<int:service-activator input-channel="next" method="foo">
    <bean class='foo.Foo" />
</int:service-activator>

public class Foo {

    public void foo(String payload) {
        ...
    }

}

If you want to deal with byte[] you can omit the transformer and use foo(byte[] bytes).

0
votes

I have a similar issue as you. I spent some time checking the spring source code. Here is what I found.

Here is what spring did.

  • First thread. When you call the gateway any method, Spring will call TcpOutboundGateway.handleRequestMessage, it will create TcpAysReply and save to TcpOutboundGateway.pendingReplies. The method will hang it there (depending on TcpAysReply) waiting for a reply.

      Message<?> getReply() {
          try {
              if (!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) {
                  return null;
              }
          }
          catch (@SuppressWarnings("unused") InterruptedException e) {
              Thread.currentThread().interrupt();
          }
          boolean waitForMessageAfterError = true;
          while (this.reply instanceof ErrorMessage) {
              if (waitForMessageAfterError) {
                  /*
                   * Possible race condition with NIO; we might have received the close
                   * before the reply, on a different thread.
                   */
                  logger.debug("second chance");
                  try {
                      this.secondChanceLatch
                              .await(TcpOutboundGateway.this.secondChanceDelay, TimeUnit.SECONDS); // NOSONAR
                  }
                  catch (@SuppressWarnings("unused") InterruptedException e) {
                      Thread.currentThread().interrupt();
                      doThrowErrorMessagePayload();
                  }
                  waitForMessageAfterError = false;
              }
              else {
                  doThrowErrorMessagePayload();
              }
          }
          return this.reply;
      }
    
  • Second thread. Spring will run method run() of TcpNetConnection

      while (true) {
          if (!receiveAndProcessMessage()) {
              break;
          }
      }
    

This method eventually call TcpOutboundGatway.onMessage()

   // some code here    
    TcpOutboundGateway.AsyncReply reply = this.pendingReplies.get(connectionId);
    // some doe here

If reply != null it will release the first thread, then the gateway method returns data.

So there are two cases that can happen:

  1. Normal case, the first thread creates AsyncReply, and the second thread release AsyncReply. In this case, you can get messages back from the gateway method normally.
  2. Abnormal case, the second thread run release AsyncReply before AysnReply is created (your case). In this case you can not get data back from the Gateway method.

The reason you still got the first message is the first time you call gateway, Spring does not yet run the second thread. That why the first call is the normal case.

Solution: In TcpOutboundGatway.onMessage()

 if (reply == null) {
            if (message instanceof ErrorMessage) {
                /*
                 * Socket errors are sent here so they can be conveyed to any waiting thread.
                 * If there's not one, simply ignore.
                 */
                return false;
            }
            else {
                if (unsolicitedSupported(message)) {
                    return false;
                }
                String errorMessage = "Cannot correlate response - no pending reply for " + connectionId;
                logger.error(errorMessage);
                publishNoConnectionEvent(message, connectionId, errorMessage);
                return false;
            }
        }

If Spring can't found a reply it will send the message to the unsolicited channel. You just need simply define unsolicited channel and ServiceActivator. e.g

@Bean
@ServiceActivator(inputChannel = "toTcp")
public MessageHandler tcpOutGate(AbstractClientConnectionFactory connectionFactory) {
    TcpOutboundGateway gate = new TcpOutboundGateway();
    gate.setRemoteTimeout(100000000);
    gate.setUnsolicitedMessageChannelName("unSolicited");
    gate.setConnectionFactory(connectionFactory);
    return gate;
}

@Bean
@ServiceActivator(inputChannel = "unSolicited")
public MessageHandler resolveFirstMessage() {
    return message -> {
        System.out.println("Received message "+message);
    };
}