I am trying to create a Tcp server which accept inbound connection, and send message to connected clients asynchronously. There is a sample of Tcp server but it is using gateway, which is request/response, does not support async.
my goal,
- server listen to socket, e.g. 9000
- a tcp client connect to 9000
- server accept connect and receive message. (use
TcpReceivingChannelAdapter
?) - server keep the connection/socket and make note of the ip_connectId header.
- when some event or schedule task produce a message for a client, it looks up the ip_connectId and send a message to that client. (use
TcpSendingMessageHandler
?)
From the reference document, I should use Collaborating Outbound and Inbound Channel Adapters. but there is no java config sample. I don't understand how to do this with java config, especially how and where to look for a client to send.
Do I need two channels? one for inbound and one for outbound? inboundAdapter->fromTcpChannel->consumer producer->outboundAdapter->toTcpChannel
Do I create ServiceActivator
or Endpoint to act as consumer/producer?
Does spring integration keep connections alive by default? and when I need to send message to it, by just adding ip_connectId header to a message?
Do I dispatch the message to client using TcpSendingMessageHandler
or need to implement a gateway
?
Clean up my code and test again after Gary's help, this is my code so far.
@EnableIntegration
@IntegrationComponentScan
@Configuration
public class IntegrationConfig implements
ApplicationListener<TcpConnectionEvent> {
@Value("${listen.port:8000}")
private int port;
@Bean //for accepting text message from TCP, putty
public MessageChannel fromTcp() {
return new DirectChannel();
}
@Bean //for sending text message to TCP client, outbound
public MessageChannel toTcp() {
return new DirectChannel();
}
// receive from MVC controller
@Bean
public MessageChannel invokeChannel() {
return new DirectChannel();
}
@Bean //inbound, it is working, I could read the inbound message while debugging
public TcpReceivingChannelAdapter in(
AbstractServerConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setOutputChannel(fromTcp());
adapter.setConnectionFactory(connectionFactory);
return adapter;
}
//transform TCP bytes to string message, working
@Transformer(inputChannel = "fromTcp", outputChannel = "toCollaborate")
public String convert(byte[] bytes) {
return new String(bytes);
}
MessageHeaders staticheader; //save ip_connectinId, use this to collaborate outbound message later, for testing purpose only
@ServiceActivator(inputChannel = "toCollaborate", outputChannel = "toTcp")
public Message<String> handleTcpMessage(Message<String> stringMsg) {
staticheader = stringMsg.getHeaders();
return stringMsg;
// save the header, collaborate to output channel
}
//collaborate message from REST API invokeChannel to a outbound tcp client, this fail
@Transformer(inputChannel = "invokeChannel", outputChannel = "toTcp")
public Message<String> headerBeforeSend(String test) {
GenericMessage<String> msg = new GenericMessage<String>(
"from rest api");
if (staticheader != null) {
MessageBuilder
.fromMessage(msg)
.setHeader("ip_connectionId",
staticheader.get("ip_connectionId")).build();
}
return msg;
}
@ServiceActivator(inputChannel = "toTcp")
@Bean
public TcpSendingMessageHandler out(
AbstractServerConnectionFactory connectionFactory) {
TcpSendingMessageHandler tcpOutboundAdp = new TcpSendingMessageHandler();
tcpOutboundAdp.setConnectionFactory(connectionFactory);
return tcpOutboundAdp;
}
// should need only 1 factory? and keep connectin alive
// server for in coming connection
@Bean
public AbstractServerConnectionFactory serverCF() {
return new TcpNetServerConnectionFactory(this.port);
}
@Override
public void onApplicationEvent(TcpConnectionEvent tcpEvent) {
// TODO Auto-generated method stub
TcpConnection source = (TcpConnection) tcpEvent.getSource();
}
}
//The MVC controller
@Autowired
MessageChannel invokeChannel;
@RequestMapping(value="/invoke")
public String sayHello()
{
//trigger gateway to send a message
String msg = "hello";
MessagingTemplate template = new MessagingTemplate();
template.send(invokeChannel, new GenericMessage<String>(msg));
return msg;
}
The test result:
1. putty connect ok, send text message
2. SI receive message ok
3. use REST API localhost/webappname/rest/invoke
to send a message to invokeChannel, ok
4. The transformer
set message header
5. exception as follow
exception org.springframework.web.util.NestedServletException: Request processing failed; nested exception is org.springframework.messaging.MessageHandlingException: Unable to find outbound socket org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:981) org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:860) javax.servlet.http.HttpServlet.service(HttpServlet.java:622) org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:845) javax.servlet.http.HttpServlet.service(HttpServlet.java:729) org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)