1
votes

I'm trying to implement a TCP client/server application with Spring Integration where I need to open one TCP client socket per incoming TCP server connection.

Basically, I have a bunch of IoT devices that communicate with a backend server over raw TCP sockets. I need to implement extra features into the system. But the software on both the devices and the server are closed source so I can't do anything about that. So my thought was to place middleware between the devices and the server that will intercept this client/server communication and provide the added functionality.

I'm using a TcpNioServerConnectionFactory and a TcpNioClientConnectionFactory with inbound/outbound channel adapters to send/receive messages to/from all parties. But there's no information in the message structure that binds a message to a certain device; therefore I have to open a new client socket to the backend every time a new connection from a new device comes on the server socket. This client connection must be bound to that specific server socket's lifecycle. It must never be reused and if this client socket (backend to middleware) dies for any reason, the server socket (middleware to device) must also be closed. How can I go about this?

Edit: My first thought was to subclass AbstractClientConnectionFactory but it appears that it doesn't do anything except provide a client connection when asked. Should I rather look into subclassing inbound/outbound channel adapters or elsewhere? I should also mention that I'm also open to non-Spring integration solutions like Apache Camel, or even a custom solution with raw NIO sockets.

Edit 2: I got halfway there by switching to TcpNetServerConnectionFactory and wrapping the client factory with a ThreadAffinityClientConnectionFactory and the devices can reach the backend fine. But when the backend sends something back, I get the error Unable to find outbound socket for GenericMessage and the client socket dies. I think it's because the backend side doesn't have the necessary header to route the message correctly. How can I capture this info? My configuration class is as follows:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class ServerConfiguration {

    @Bean
    public AbstractServerConnectionFactory serverFactory() {
        AbstractServerConnectionFactory factory = new TcpNetServerConnectionFactory(8000);
        factory.setSerializer(new MapJsonSerializer());
        factory.setDeserializer(new MapJsonSerializer());
        return factory;
    }

    @Bean
    public AbstractClientConnectionFactory clientFactory() {
        AbstractClientConnectionFactory factory = new TcpNioClientConnectionFactory("localhost", 3333);
        factory.setSerializer(new MapJsonSerializer());
        factory.setDeserializer(new MapJsonSerializer());
        factory.setSingleUse(true);
        return new ThreadAffinityClientConnectionFactory(factory);
    }

    @Bean
    public TcpReceivingChannelAdapter inboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
        inbound.setConnectionFactory(connectionFactory);
        return inbound;
    }

    @Bean
    public TcpSendingMessageHandler outboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
        TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
        outbound.setConnectionFactory(connectionFactory);
        return outbound;
    }

    @Bean
    public TcpReceivingChannelAdapter inboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
        inbound.setConnectionFactory(connectionFactory);
        return inbound;
    }

    @Bean
    public TcpSendingMessageHandler outboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
        TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
        outbound.setConnectionFactory(connectionFactory);
        return outbound;
    }

    @Bean
    public IntegrationFlow backendIntegrationFlow() {
        return IntegrationFlows.from(inboundBackendAdapter(clientFactory()))
                .log(LoggingHandler.Level.INFO)
                .handle(outboundDeviceAdapter(serverFactory()))
                .get();
    }

    @Bean
    public IntegrationFlow deviceIntegrationFlow() {
        return IntegrationFlows.from(inboundDeviceAdapter(serverFactory()))
                .log(LoggingHandler.Level.INFO)
                .handle(outboundBackendAdapter(clientFactory()))
                .get();
    }
}
1

1 Answers

2
votes

It's not entirely clear what you are asking so I am going to assume that you mean you want a spring integration proxy between your clients and servers. Something like:

iot-device -> spring server -> message-transformation -> spring client -> back-end-server

If that's the case, you can implement a ClientConnectionIdAware client connection factory that wraps a standard factory.

In the integration flow, bind the incoming ip_connectionId header in a message to the thread (in a ThreadLocal).

Then, in the client connection factory, look up the corresponding outgoing connection in a Map using the ThreadLocal value; if not found (or closed), create a new one and store it in the map for future reuse.

Implement an ApplictionListener (or @EventListener) to listen for TcpConnectionCloseEvents from the server connection factory and close() the corresponding outbound connection.

This sounds like a cool enhancement so consider contributing it back to the framework.

EDIT

Version 5.0 added the ThreadAffinityClientConnectionFactory which would work out of the box with a TcpNetServerConnectionFactory since each connection gets its own thread.

With a TcpNioServerConnectionFactory you would need the extra logic to dynamically bind the connection to the thread for each request.

EDIT2

@SpringBootApplication
public class So51200675Application {

    public static void main(String[] args) {
        SpringApplication.run(So51200675Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
            socket.getOutputStream().write("foo\r\n".getBytes());
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println(reader.readLine());
            socket.close();
        };
    }

    @Bean
    public Map<String, String> fromToConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public Map<String, String> toFromConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public IntegrationFlow proxyInboundFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
                .transform(Transformers.objectToString())
                .<String, String>transform(s -> s.toUpperCase())
                .handle((p, h) -> {
                    mapConnectionIds(h);
                    return p;
                })
                .handle(Tcp.outboundAdapter(threadConnectionFactory()))
                .get();
    }

    @Bean
    public IntegrationFlow proxyOutboundFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
                .transform(Transformers.objectToString())
                .<String, String>transform(s -> s.toUpperCase())
                .enrichHeaders(e -> e
                        .headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
                                + IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
                .handle(Tcp.outboundAdapter(serverFactory()))
                .get();
    }

    private void mapConnectionIds(Map<String, Object> h) {
        try {
            TcpConnection connection = threadConnectionFactory().getConnection();
            String mapping = toFromConnectionMappings().get(connection.getConnectionId());
            String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
            if (mapping == null || !(mapping.equals(incomingCID))) {
                System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
                toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
                fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Bean
    public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
        return new ThreadAffinityClientConnectionFactory(clientFactory()) {

            @Override
            public boolean isSingleUse() {
                return false;
            }

        };
    }

    @Bean
    public AbstractServerConnectionFactory serverFactory() {
        return Tcp.netServer(1234).get();
    }

    @Bean
    public AbstractClientConnectionFactory clientFactory() {
        AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235).get();
        clientFactory.setSingleUse(true);
        return clientFactory;
    }

    @Bean
    public IntegrationFlow serverFlow() {
        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)))
                .transform(Transformers.objectToString())
                .<String, String>transform(p -> p + p)
                .get();
    }

    @Bean
    public ApplicationListener<TcpConnectionCloseEvent> closer() {
        return e -> {
            if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
                String key = fromToConnectionMappings().remove(e.getConnectionId());
                toFromConnectionMappings().remove(key);
                System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
                threadConnectionFactory().releaseConnection();
            }
        };
    }

}

EDIT3

Works fine for me with a MapJsonSerializer.

@SpringBootApplication
public class So51200675Application {

    public static void main(String[] args) {
        SpringApplication.run(So51200675Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
            socket.getOutputStream().write("{\"foo\":\"bar\"}\n".getBytes());
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println(reader.readLine());
            socket.close();
        };
    }

    @Bean
    public Map<String, String> fromToConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public Map<String, String> toFromConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public MapJsonSerializer serializer() {
        return new MapJsonSerializer();
    }

    @Bean
    public IntegrationFlow proxyRequestFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
                .<Map<String, String>, Map<String, String>>transform(m -> {
                    m.put("foo", m.get("foo").toUpperCase());
                    return m;
                })
                .handle((p, h) -> {
                    mapConnectionIds(h);
                    return p;
                })
                .handle(Tcp.outboundAdapter(threadConnectionFactory()))
                .get();
    }

    @Bean
    public IntegrationFlow proxyReplyFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
                .<Map<String, String>, Map<String, String>>transform(m -> {
                    m.put("foo", m.get("foo").toLowerCase() + m.get("foo"));
                    return m;
                })
                .enrichHeaders(e -> e
                        .headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
                                + IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
                .handle(Tcp.outboundAdapter(serverFactory()))
                .get();
    }

    private void mapConnectionIds(Map<String, Object> h) {
        try {
            TcpConnection connection = threadConnectionFactory().getConnection();
            String mapping = toFromConnectionMappings().get(connection.getConnectionId());
            String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
            if (mapping == null || !(mapping.equals(incomingCID))) {
                System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
                toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
                fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Bean
    public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
        return new ThreadAffinityClientConnectionFactory(clientFactory()) {

            @Override
            public boolean isSingleUse() {
                return false;
            }

        };
    }

    @Bean
    public AbstractServerConnectionFactory serverFactory() {
        return Tcp.netServer(1234)
                .serializer(serializer())
                .deserializer(serializer())
                .get();
    }

    @Bean
    public AbstractClientConnectionFactory clientFactory() {
        AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235)
                .serializer(serializer())
                .deserializer(serializer())
                .get();
        clientFactory.setSingleUse(true);
        return clientFactory;
    }

    @Bean
    public IntegrationFlow backEndEmulatorFlow() {
        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)
                    .serializer(serializer())
                    .deserializer(serializer())))
                .<Map<String, String>, Map<String, String>>transform(m -> {
                    m.put("foo", m.get("foo") + m.get("foo"));
                    return m;
                })
                .get();
    }

    @Bean
    public ApplicationListener<TcpConnectionCloseEvent> closer() {
        return e -> {
            if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
                String key = fromToConnectionMappings().remove(e.getConnectionId());
                toFromConnectionMappings().remove(key);
                System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
                threadConnectionFactory().releaseConnection();
            }
        };
    }

}

and

Adding new mapping localhost:56998:1234:55c822a4-4252-45e6-9ef2-79263391f4be to localhost:1235:56999:3d520ca9-2f3a-44c3-b05f-e59695b8c1b0 {"foo":"barbarBARBAR"} Removed mapping localhost:56998:1234:55c822a4-4252-45e6-9ef2-79263391f4be to localhost:1235:56999:3d520ca9-2f3a-44c3-b05f-e59695b8c1b0