1
votes

I migrated a legacy tcp server code into spring-boot and added spring-intergration (annotation based) dependencies to handle tcp socket connections.

My inbound Channel is tcpIn() , outbound Channel is serviceChannel() and i have created a custom Channel [ exceptionEventChannel() ] to hold exception event messages.

I have a custom serializer/Deserialier method (ByteArrayLengthPrefixSerializer() extends AbstractPooledBufferByteArraySerializer), and a MessageHandler @ServiceActivator method to send response back to tcp client.


//SpringBoot 2.0.3.RELEASE, Spring Integration 5.0.6.RELEASE

package com.test.config;


import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.TcpDeserializationExceptionEvent;
import org.springframework.integration.router.ErrorMessageExceptionTypeRouter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;

import java.io.IOException;

@Configuration
@IntegrationComponentScan
public class TcpConfiguration {
    @SuppressWarnings("unused")
    @Value("${tcp.connection.port}")
    private int tcpPort;

    @Bean
    TcpConnectionEventListener customerTcpListener() {
        return new TcpConnectionEventListener();
    }

    @Bean
    public MessageChannel tcpIn() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel serviceChannel() {
        return new DirectChannel();
    }

    @ConditionalOnMissingBean(name = "errorChannel")
    @Bean
    public MessageChannel errorChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel exceptionEventChannel() {
        return new DirectChannel();
    }

    @Bean
    public ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer() {
        ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer = new ByteArrayLengthPrefixSerializer();
        byteArrayLengthPrefixSerializer.setMaxMessageSize(98304); //max allowed size set to 96kb
        return byteArrayLengthPrefixSerializer;
    }

    @Bean
    public AbstractServerConnectionFactory tcpNetServerConnectionFactory() {
        TcpNetServerConnectionFactory tcpServerCf = new TcpNetServerConnectionFactory(tcpPort);
        tcpServerCf.setSerializer(byteArrayLengthPrefixSerializer());
        tcpServerCf.setDeserializer(byteArrayLengthPrefixSerializer());
        return tcpServerCf;

    }

    @Bean
    public TcpReceivingChannelAdapter tcpReceivingChannelAdapter() {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(tcpNetServerConnectionFactory());
        adapter.setOutputChannel(tcpIn());
        adapter.setErrorChannel(exceptionEventChannel());
        return adapter;
    }

    @ServiceActivator(inputChannel = "exceptionEventChannel", outputChannel = "serviceChannel")
    public String handle(Message<MessagingException> msg) {
        //String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
        System.out.println("-----------------EXCEPTION ==> " + msg);
        return msg.toString();
    }

    @Transformer(inputChannel = "errorChannel", outputChannel = "serviceChannel")
    public String transformer(String msg) {
        //String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
        System.out.println("-----------------ERROR ==> " + msg);
        return msg.toString();
    }


    @ServiceActivator(inputChannel = "serviceChannel")
    @Bean
    public TcpSendingMessageHandler out(AbstractServerConnectionFactory cf) {
        TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
        tcpSendingMessageHandler.setConnectionFactory(cf);
        return tcpSendingMessageHandler;
    }


    @Bean
    public ApplicationListener<TcpDeserializationExceptionEvent> listener() {
        return new ApplicationListener<TcpDeserializationExceptionEvent>() {

            @Override
            public void onApplicationEvent(TcpDeserializationExceptionEvent tcpDeserializationExceptionEvent) {
                exceptionEventChannel().send(MessageBuilder.withPayload(tcpDeserializationExceptionEvent.getCause())
                        .build());
            }

        };
    }
}

Messages in tcpIn() is sent to a @ServiceActivator method inside a separate @Component Class, which is structured like so :

@Component
public class TcpServiceActivator {

    @Autowired
    public TcpServiceActivator() {
    }


    @ServiceActivator(inputChannel = "tcpIn", outputChannel = "serviceChannel")
    public String service(byte[] byteMessage) {
     // Business Logic returns String Ack Response
    }

I don't have issues running a success scenario. My Tcp TestClient gets Ack response as expected.

However, when i try to simulate an exception, say Deserializer Exception, The exception message is not sent back as a response to Tcp Client. I can see my Application Listener getting TcpDeserializationExceptionEvent and sending the message to exceptionEventChannel. The @ServiceActivator method handle(Message msg) also prints my exception message. But it never reaches the breakpoints (in a debug mode) inside MessageHandler method out(AbstractServerConnectionFactory cf).

I am struggling to understand whats going wrong. Thanks for any help in advance.


UPDATE : I notice that the Socket is closed due to exception before the response can be sent. I'm trying to figure out a way around this

SOLUTION UPDATE (12th Mar 2019) :

Courtesy of Gary, i edited my deserializer to return a message that can be traced by a @Router method and redirected to errorChannel. The ServiceActivator listening to errorchannel then sends the desired error message to outputChannel . This solution seems to work.

My deserializer method inside ByteArrayLengthPrefixSerializer returning a "special value" as Gary recommended, instead of the original inputStream message.

    public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
        boolean isValidMessage = false;
        try {
            int messageLength = this.readPrefix(inputStream);
            if (messageLength > 0 && fillUntilMaxDeterminedSize(inputStream, buffer, messageLength)) {
                return this.copyToSizedArray(buffer, messageLength);
            }
            return EventType.MSG_INVALID.getName().getBytes(); 
        } catch (SoftEndOfStreamException eose) {
            return EventType.MSG_INVALID.getName().getBytes();
        }
    }

I also made a few new channels to accommodate my Router such that the flow is as follows :

Success flow tcpIn (@Router) -> serviceChannel(@serviceActivator that holds business logic) -> outputChannel (@serviceActivator that sends response to client)

Exception flow tcpIn (@Router) -> errorChannel(@serviceActivator that prepares the error Response message) -> outputChannel (@serviceActivator that sends response to client)

My @Router and 'errorHandling' @serviceActivator -

    @Router(inputChannel = "tcpIn", defaultOutputChannel = "errorChannel")
    public String messageRouter(byte[] byteMessage) {
        String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
        System.out.println("------------------> "+unfilteredMessage);
        if (Arrays.equals(EventType.MSG_INVALID.getName().getBytes(), byteMessage)) {
            return "errorChannel";
        }
        return "serviceChannel";
    }


    @ServiceActivator(inputChannel = "errorChannel", outputChannel = "outputChannel")
    public String errorHandler(byte[] byteMessage) {
        return Message.ACK_RETRY;
    }

1

1 Answers

1
votes

The error channel is for handling exceptions that occur while processing a message. Deserialization errors occur before a message is created (the deserializer decodes the payload for the message).

Deserialization exceptions are fatal and, as you have observed, the socket is closed.

One option would be to catch the exception in the deserializer and return a "special" value that indicates a deserialization exception occurred, then check for that value in your main flow.