I'm in the process of designing a system that will run using a Spring MVC web application. It will be used to send and receive TCP commands to and from an existing (non-Spring) application which is used to control some network data filters. I'm just playing around with the Spring Integration TCP stuff (I'm new to SI and Spring in general) to try and understand it but I'm struggling to get a basic example to work.
I need the communication to be asynchronous, as the server and client ends could send data at any time, and it may or may not need a reply. So I believe what I need to use are collaborating channel adapters rather than gateways.
My demo program should wait for a client to connect and then receive a series of String messages, to which it echoes a reply. The user can also type stuff to send from the server side.
It's based on the tcp-client-server example in the samples. I would like to do it all via Java config rather than XML.
What I would like the demo below to do is echo the incoming data back to the client.
Here's the server config class:
@Configuration()
@EnableIntegration
@IntegrationComponentScan
public class ServerConfiguration implements ApplicationListener<TcpConnectionEvent> {
private final int port = SocketUtils.findAvailableServerSocket(5000);
@MessagingGateway(defaultRequestChannel="toTcp")
public interface Gateway {
String send(String in);
}
@Bean
public AbstractServerConnectionFactory serverFactory() {
System.out.println("serverFactory");
AbstractServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(port);
return connectionFactory;
}
@Bean MessageChannel toTcp() {
System.out.println("creating toTcp DirectChannel");
DirectChannel dc = new DirectChannel();
dc.setBeanName("toTcp");
return dc;
}
@Bean
public MessageChannel fromTcp() {
System.out.println("creating fromTcp DirectChannel");
DirectChannel dc = new DirectChannel();
dc.setBeanName("fromTcp");
return dc;
}
// Inbound channel adapter. This receives the data from the client
@Bean
public TcpReceivingChannelAdapter inboundAdapter(AbstractServerConnectionFactory connectionFactory) {
System.out.println("Creating inbound adapter");
TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
inbound.setConnectionFactory(connectionFactory);
inbound.setOutputChannel("fromTcp");
return inbound;
}
// Outbound channel adapter. This sends the data to the client
@Bean
@ServiceActivator(inputChannel="toTcp")
public TcpSendingMessageHandler outboundAdapter(AbstractServerConnectionFactory connectionFactory) {
System.out.println("Creating outbound adapter");
TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
outbound.setConnectionFactory(connectionFactory);
return outbound;
}
// Endpoint example
@MessageEndpoint
public static class Echo {
// Server
@Transformer(inputChannel="fromTcp", outputChannel="toEcho")
public String convert(byte[] bytes) {
System.out.println("convert: " + new String(bytes));
return new String(bytes);
}
// Server
@ServiceActivator(inputChannel="toEcho", outputChannel="toTcp")
public String upCase(String in) {
System.out.println("upCase: " + in.toUpperCase());
return in.toUpperCase();
}
}
@Override
public void onApplicationEvent(TcpConnectionEvent event) {
System.out.println("Got TcpConnectionEvent: source=" + event.getSource() +
", id=" + event.getConnectionId());
}
}
Here's the main class:
@SpringBootApplication
@IntegrationComponentScan
@EnableMessageHistory
public class SpringIntegrationTcpTest {
@Autowired
private ServerConfiguration.Gateway gateway;
public String send(String data) {
return gateway.send(data);
}
public static void main(String[] args) throws IOException {
ConfigurableApplicationContext context = SpringApplication.run(SpringIntegrationTcpTest.class, args);
SpringIntegrationTcpTest si = context.getBean(SpringIntegrationTcpTest.class);
final AbstractServerConnectionFactory crLfServer = context.getBean(AbstractServerConnectionFactory.class);
final Scanner scanner = new Scanner(System.in);
System.out.print("Waiting for server to accept connections on port " + crLfServer.getPort());
TestingUtilities.waitListening(crLfServer, 100000L);
System.out.println("running.\n\n");
System.out.println("Please enter some text and press <enter>: ");
System.out.println("\tNote:");
System.out.println("\t- Entering FAIL will create an exception");
System.out.println("\t- Entering q will quit the application");
System.out.print("\n");
while (true) {
final String input = scanner.nextLine();
if("q".equals(input.trim())) {
break;
}
else {
final String result = si.send(input);
System.out.println(result);
}
}
scanner.close();
context.close();
}
}
And here's the dummy client class:
public class TcpClient {
public TcpClient() {
}
private void connect(String host, int port) throws InterruptedException {
Socket socket = null;
Writer out = null;
BufferedReader in = null;
try {
System.out.print("Connecting to " + host + " on port " + port + " ... ");
socket = new Socket(host, port);
System.out.println("connected.");
System.out.println("sending 100 messages");
out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
for (int i = 1; i < 100; ++i) {
String msg = "hello" + i;
out.write(msg+"\r\n");
out.flush();
//System.out.print(msg+"\r\n");
System.out.println("Waiting for message ...");
StringBuffer str = new StringBuffer();
int c;
while ((c = in.read()) != -1) {
str.append((char) c);
}
String response = str.toString();
System.out.println("got message: " + response);
Thread.sleep(1000);
}
} catch (IOException e) {
System.err.println("Test ended with an exception: " + port + ", " + e.getMessage());
} finally {
try {
socket.close();
out.close();
//in.close();
} catch (Exception e) {
// swallow exception
}
}
}
public static void main(String[] args) throws InterruptedException {
String host = args[0];
int port = Integer.parseInt(args[1]);
new TcpClient().connect(host, port);
}
}
I've spent a good deal of time playing with gateways, etc. and get it to sort of work with telnet, and to receive messages from the client using a gateway. What I can't do is get it work properly with channel adapters.
When the client is started it will send the string which is received by the server and printed to the console.Nothing appears to be sent back, as the client just sits on "Waiting for message ...". When sending something from the server side, I get the following exception:
Please enter some text and press <enter>:
Note:
- Entering FAIL will create an exception
- Entering q will quit the application
Got TcpConnectionEvent: source=org.springframework.integration.ip.tcp.connection.TcpNetConnection@67162888, id=127.0.0.1:50940:5000:052bf55b-526a-4ea9-bfe3-8ecc573239a3
convert: hello1
upCase: HELLO1
qwe
2017-01-10 12:09:13.995 ERROR 7296 --- [ main] o.s.i.ip.tcp.TcpSendingMessageHandler : Unable to find outbound socket for GenericMessage [payload=qwe, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6b5894c8, history=serverConfiguration$Gateway,toTcp,serverConfiguration.outboundAdapter.serviceActivator.handler,outboundAdapter, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6b5894c8, id=a4ea72f2-6b12-379b-1b15-f75b821f0b7f, timestamp=1484050153995}]
Exception in thread "main" org.springframework.messaging.MessageHandlingException: Unable to find outbound socket
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
so the problem is that's there's no outbound socket. So where is the outbound socket defined? What else am I doing wrong?