4
votes

I'm in the process of designing a system that will run using a Spring MVC web application. It will be used to send and receive TCP commands to and from an existing (non-Spring) application which is used to control some network data filters. I'm just playing around with the Spring Integration TCP stuff (I'm new to SI and Spring in general) to try and understand it but I'm struggling to get a basic example to work.

I need the communication to be asynchronous, as the server and client ends could send data at any time, and it may or may not need a reply. So I believe what I need to use are collaborating channel adapters rather than gateways.

My demo program should wait for a client to connect and then receive a series of String messages, to which it echoes a reply. The user can also type stuff to send from the server side.

It's based on the tcp-client-server example in the samples. I would like to do it all via Java config rather than XML.

What I would like the demo below to do is echo the incoming data back to the client.

Here's the server config class:

@Configuration()
@EnableIntegration
@IntegrationComponentScan
public class ServerConfiguration implements ApplicationListener<TcpConnectionEvent> {

private final int port = SocketUtils.findAvailableServerSocket(5000);

@MessagingGateway(defaultRequestChannel="toTcp")
public interface Gateway {
    String send(String in);
}

@Bean
public AbstractServerConnectionFactory serverFactory() {
    System.out.println("serverFactory");
    AbstractServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(port);
    return connectionFactory;
}

@Bean MessageChannel toTcp() {
    System.out.println("creating toTcp DirectChannel");
    DirectChannel dc = new DirectChannel();
    dc.setBeanName("toTcp");

    return dc;
}

@Bean
public MessageChannel fromTcp() {
    System.out.println("creating fromTcp DirectChannel");
    DirectChannel dc = new DirectChannel();
    dc.setBeanName("fromTcp");

    return dc;
}

// Inbound channel adapter. This receives the data from the client
@Bean
public TcpReceivingChannelAdapter inboundAdapter(AbstractServerConnectionFactory connectionFactory) {
    System.out.println("Creating inbound adapter");
    TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();

    inbound.setConnectionFactory(connectionFactory);
    inbound.setOutputChannel("fromTcp");

    return inbound;
}

// Outbound channel adapter. This sends the data to the client
@Bean
@ServiceActivator(inputChannel="toTcp")
public TcpSendingMessageHandler outboundAdapter(AbstractServerConnectionFactory connectionFactory) {
    System.out.println("Creating outbound adapter");
    TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
    outbound.setConnectionFactory(connectionFactory);
    return outbound;
}

// Endpoint example 
@MessageEndpoint
public static class Echo {

    // Server
    @Transformer(inputChannel="fromTcp", outputChannel="toEcho")
    public String convert(byte[] bytes) {
        System.out.println("convert: " + new String(bytes));
        return new String(bytes);
    }

    // Server
    @ServiceActivator(inputChannel="toEcho", outputChannel="toTcp")
    public String upCase(String in) {
        System.out.println("upCase: " + in.toUpperCase());
        return in.toUpperCase();
    }
}

@Override
public void onApplicationEvent(TcpConnectionEvent event) {
    System.out.println("Got TcpConnectionEvent: source=" + event.getSource() + 
            ", id=" + event.getConnectionId()); 
}   
}

Here's the main class:

@SpringBootApplication
@IntegrationComponentScan
@EnableMessageHistory
public class SpringIntegrationTcpTest {

    @Autowired
    private ServerConfiguration.Gateway gateway;

    public String send(String data) {
        return gateway.send(data);
    }


public static void main(String[] args) throws IOException {

    ConfigurableApplicationContext context = SpringApplication.run(SpringIntegrationTcpTest.class, args);

    SpringIntegrationTcpTest si = context.getBean(SpringIntegrationTcpTest.class);

    final AbstractServerConnectionFactory crLfServer = context.getBean(AbstractServerConnectionFactory.class);

    final Scanner scanner = new Scanner(System.in);
    System.out.print("Waiting for server to accept connections on port " + crLfServer.getPort());
    TestingUtilities.waitListening(crLfServer, 100000L);
    System.out.println("running.\n\n");

    System.out.println("Please enter some text and press <enter>: ");
    System.out.println("\tNote:");
    System.out.println("\t- Entering FAIL will create an exception");
    System.out.println("\t- Entering q will quit the application");
    System.out.print("\n");

    while (true) {

        final String input = scanner.nextLine();

        if("q".equals(input.trim())) {
            break;
        }
        else {
            final String result = si.send(input);
            System.out.println(result);
        }
    }

    scanner.close();
    context.close();
}
}

And here's the dummy client class:

public class TcpClient {

    public TcpClient() {
    }

    private void connect(String host, int port) throws InterruptedException {
        Socket socket = null;
        Writer out = null;
        BufferedReader in = null;

        try {
            System.out.print("Connecting to " + host + " on port " + port + " ... ");
            socket = new Socket(host, port);
            System.out.println("connected.");

            System.out.println("sending 100 messages");

            out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

            for (int i = 1; i < 100; ++i) {
                String msg =  "hello" + i;

                out.write(msg+"\r\n");
                out.flush();
                //System.out.print(msg+"\r\n");

                System.out.println("Waiting for message ...");

                StringBuffer str = new StringBuffer();
                int c;
                while ((c = in.read()) != -1) {
                    str.append((char) c);
                }       

                String response = str.toString();
                System.out.println("got message: " + response);

                Thread.sleep(1000);
            }


        } catch (IOException e) {

            System.err.println("Test ended with an exception: " + port + ", " + e.getMessage());

        } finally {
            try {
                socket.close();
                out.close();
                //in.close();

            } catch (Exception e) {
                // swallow exception
            }

        }       

    }

    public static void main(String[] args) throws InterruptedException {

        String host = args[0];
        int port = Integer.parseInt(args[1]);
        new TcpClient().connect(host, port);
    }

}

I've spent a good deal of time playing with gateways, etc. and get it to sort of work with telnet, and to receive messages from the client using a gateway. What I can't do is get it work properly with channel adapters.

When the client is started it will send the string which is received by the server and printed to the console.Nothing appears to be sent back, as the client just sits on "Waiting for message ...". When sending something from the server side, I get the following exception:

Please enter some text and press <enter>:
        Note:
        - Entering FAIL will create an exception
        - Entering q will quit the application

Got TcpConnectionEvent: source=org.springframework.integration.ip.tcp.connection.TcpNetConnection@67162888, id=127.0.0.1:50940:5000:052bf55b-526a-4ea9-bfe3-8ecc573239a3
convert: hello1
upCase: HELLO1
qwe
2017-01-10 12:09:13.995 ERROR 7296 --- [           main] o.s.i.ip.tcp.TcpSendingMessageHandler    : Unable to find outbound socket for GenericMessage [payload=qwe, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6b5894c8, history=serverConfiguration$Gateway,toTcp,serverConfiguration.outboundAdapter.serviceActivator.handler,outboundAdapter, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6b5894c8, id=a4ea72f2-6b12-379b-1b15-f75b821f0b7f, timestamp=1484050153995}]
Exception in thread "main" org.springframework.messaging.MessageHandlingException: Unable to find outbound socket
        at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)

so the problem is that's there's no outbound socket. So where is the outbound socket defined? What else am I doing wrong?

2

2 Answers

2
votes
  1. You don't need to call setBeanName on the channels - the framework will do that automatically for you.

  2. Your gateway is expecting a reply and the toTcp channel is connected to a channel adapter which returns no reply - use a void return type for this scenario

o.s.i.ip.tcp.TcpSendingMessageHandler : Unable to find outbound socket

To send arbitrary messages to a connected client, you need to tell the adapter which client to send it to by setting the ip_connectionId header (there's a constant IpHeaders.CONNECTION_ID.

You need to set that header - you can capture it via the TcpConnectionOpenEvent and add it to the header via the gateway...

void send(@Payload String data, @Header(IpHeaders.CONNECTION_ID) String connectionId);
0
votes

thanks for the reply. I have added the changes to the gateway as you describe and stored the client ID from the connection event. It almost works, but the replies are not displaying on the client console until I quit the server program.

This is the server side output:

Please enter some text and press <enter>:
        Note:
        - Entering FAIL will create an exception
        - Entering q will quit the application

client id is 127.0.0.1:58209:5000:ed6b6d48-5de7-4470-ac59-924788cf2957
convert: hello1
upCase: HELLO1
abc
def
ghi
q

and this is the client side output:

Connecting to localhost on port 5000 ... connected.
sending 1000 messages
Waiting for message ...
got message: HELLO1 <-- appears after server quit
abc 
def 
ghi 

The last four lines only appear after the server program has quit.

In the main class I am storing the client ID by:

@Override
public void onApplicationEvent(TcpConnectionEvent event) {
    clientId = event.getConnectionId(); 
    System.out.println("client id is " + clientId);
}

and then sending the messages by

send(data, clientId)

where the gateway is now defined as

@MessagingGateway(defaultRequestChannel="toTcp")
public interface Gateway {
    //void send(String in);
    void send(@Payload String data, @Header(IpHeaders.CONNECTION_ID) String connectionId);
}

The client program is unchanged from the last post.