0
votes

I have a requirement where i am (my application) receiving commands (string values) in a Spring Integration channel through TCP connections. When I receive a command, the application sends it over to a router to route the request to different channels in order to activate services using service activators.

The router first validates the command. If the command is not valid the router should not allow further processing and a custom error message needs to be sent back to the sender (TCP Client). I want a solution where I can analyze my command before or during routing and send the error response without any further processing ?

Following are the configurations and code.

Spring Integration Config

    <ip:tcp-connection-factory id="server"
    type="server"
    port="${port}"
    using-nio="true"
    serializer="stxEtxSerializer"
    deserializer="stxEtxSerializer" />

<ip:tcp-inbound-channel-adapter id="inAdapter.server"
    channel="adiIncomingChannel"
    connection-factory="server" />

<channel id="adiIncomingChannel" datatype="java.lang.String" />

<channel id="toSA" datatype="java.lang.String" />
<service-activator input-channel="toSA"
                   ref="tcpMessageReceiver"
                   method="receiveTcpMessageBytes" />

<beans:bean id="tcpMessageReceiver"
      class="com.gcaa.adi.tcp.socket.TcpMessageReceiver" />

<channel id="toObAdapter" />

<ip:tcp-outbound-channel-adapter id="outAdapter.server"
    channel="toObAdapter"
    connection-factory="server" />


<router input-channel="adiIncomingChannel" ref="cmdRouter"></router>    



<beans:bean id="cmdRouter" class="com.gcaa.adi.messaging.router.CommandRouter">
</beans:bean>

Router Class

public class CommandRouter{

public String contentRouter(byte[] cmdBytes){
    String channelName = null;
    String cmd = new String(cmdBytes);

    //Data Requests
    if(cmd.startsWith(AdiCommandType.DATA_REQUEST.getCode())){

        AdiCommandValidationModel validCmdModel = validateDataCommand(cmd);

        if(! validCmdModel.isValidCmd()){
            // TODO: Send the error to  requesting client. Error = "<ERROR::<DATA::REQUEST>::001>"
        }
        channelName = "toSA";


    } // Health Check Response
    else if(cmd.startsWith("<HEALTH::ACK>")){
        channelName = "toSA";
    }

    return channelName;
}

private AdiCommandValidationModel validateDataCommand(String cmd){
    AdiCommandValidationModel validModel = new AdiCommandValidationModel();
    validModel.setCmd(cmd);

    List<MatchedGroup> matchedCmdItems = GcaaUtil.matchByGroups("^<DATA::(\\d[1-31]-\\d[1-12]-20\\d\\d)::(\\d+)>", cmd);

    if(null != matchedCmdItems && matchedCmdItems.size() > 0){

        // 1. Check if request date is valid
        if(!GcaaUtil.isEmptyString(matchedCmdItems.get(0).getGroup())){
            boolean isValidCmd = GcaaUtil.isDateValid(matchedCmdItems.get(0).getGroup(), "DD-MM-YYYY");
            validModel.setValidCmd(isValidCmd);             
        }else{              
            validModel.setValidCmd(false);
        }

        // return if date is not valid
        if(! validModel.isValidCmd()){
            validModel.setErrorCode(AdiErrorCode.DATA_INVALID_DATE);
            return validModel;
        }

        // 2. Check that DI value is >=0. It should not be less than 0 or NULL
        if(!GcaaUtil.isEmptyString(matchedCmdItems.get(1).getGroup())){
            Integer di = null;
            try{
                di = Integer.parseInt(matchedCmdItems.get(1).getGroup());
            }catch(NumberFormatException nfe){

            }
            if(di== null || di < 0){validModel.setErrorCode(AdiErrorCode.DATA_INVALID_DATE);
            return validModel;}
        }else{
            validModel.setErrorCode(AdiErrorCode.INVALID_COMMAND);
            return validModel;
        }
    }


    if(! validModel.isValidCmd()){
        validModel.setErrorCode(AdiErrorCode.INVALID_COMMAND);
        return validModel;
    }
    return validModel;
}
}

Updated - Solution 2

I have tried the solution referred by you in this this link but I am getting an error pasted below.

Updated Service Activator code is listed below. I have removed the validation from the Router class and moved it to ServiceActivator.

Validation ServiceActivator:

public void validate(Message<byte[]> msg){
    String channelName = null;
    String cmd = new String(msg.getPayload());

    AdiCommandValidationModel validCmdModel = validateDataCommand(cmd);

    if(! validCmdModel.isValidCmd()){
        // TODO: Send to error channel 
        String err = "<ERROR::"+validCmdModel.getCmd()+"::"+validCmdModel.getErrorCode().getCode()+">";
        Message<String> mb = MessageBuilder.createMessage(err, msg.getHeaders());
        throw new MessagingException(mb);
    }

    //Data Requests
    if(cmd.startsWith(AdiCommandType.DATA_REQUEST.getCode())){

        validCmdModel = validateDataCommand(cmd);

        if(! validCmdModel.isValidCmd()){
            // TODO: Send to error channel
            throw new MessagingException(new GenericMessage<String>(validCmdModel.getValdationMsg()));
        }
        channelName = "toSA";


    } // Health Check Response
    else if(cmd.startsWith("<HEALTH::ACK>")){
        channelName = "toSA";   
    }

}

Modified Config for Validation ServiceActivator

  <!-- Server side -->
    <ip:tcp-connection-factory id="server"
        type="server"
        port="${port}"
        using-nio="true"
        serializer="stxEtxSerializer"
        deserializer="stxEtxSerializer" />

    <ip:tcp-inbound-channel-adapter id="inAdapter.server"
        channel="adiIncomingChannel"
        connection-factory="server" 
        error-channel="inputErrorChannel"/>


    <!-- <object-to-string-transformer input-channel="adiIncomingChannel" output-channel="adiIncomingChannelStr"/> -->

    <service-activator input-channel="adiIncomingChannel"
                       ref="adiInputValidator"
                       method="validate" />

    <channel id="adiIncomingChannel"/>
    <!-- <channel id="adiIncomingChannel" datatype="java.lang.String" /> -->
    <channel id="inputErrorChannel" />

    <channel id="toObAdapter" />

    <ip:tcp-outbound-channel-adapter id="outAdapter.server"
        channel="toObAdapter"
        connection-factory="server" />

    <transformer input-channel="inputErrorChannel"
    output-channel="toObAdapter"
    expression="payload.failedMessage.payload"/>

<!-- dataType attribute invokes the conversion service -->
    <channel id="toSA" datatype="java.lang.String" />

    <service-activator input-channel="toSA"
                       ref="tcpMessageReceiver"
                       method="receiveTcpMessageBytes" />

<!-- output-channel="toObAdapter" -->
    <beans:bean id="tcpMessageReceiver"
          class="com.gcaa.adi.tcp.socket.TcpMessageReceiver" />

    <beans:bean id="adiInputValidator"
          class="com.gcaa.adi.validation.AdiInputValidator" />

Exception from solution 2 ---***------------------* IN

TcpConnectionFailedCorrelation -*-*-*-*-*-*-*-*-*-*-**-*
     Connection Correlation ID: null
     Connection Source: org.springframework.integration.ip.tcp.TcpSendingMessageHandler#0
     Connection time: 1486465992372
     Cause = : org.springframework.messaging.MessageHandlingException: Unable to find outbound socket
2017-02-07 15:13:13.183 ERROR 4444 --- [pool-2-thread-3] o.s.i.i.tcp.connection.TcpNioConnection  : Exception sending message: GenericMessage [payload=byte[10], headers={ip_tcp_remotePort=59521, ip_connectionId=127.0.0.1:59521:5677:9a7fd07e-716b-4d1c-8be6-c73223cea9db, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=ca87761a-4a39-5869-ad53-7e078cf40508, ip_hostname=127.0.0.1, timestamp=1486465989192}]

org.springframework.messaging.MessageHandlingException: Unable to find outbound socket
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:123) ~[spring-integration-ip-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:176) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter.onMessage(TcpReceivingChannelAdapter.java:88) ~[spring-integration-ip-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.ip.tcp.connection.TcpNioConnection.sendToChannel(TcpNioConnection.java:371) [spring-integration-ip-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.ip.tcp.connection.TcpNioConnection.run(TcpNioConnection.java:240) [spring-integration-ip-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_74]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_74]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_74]
1

1 Answers

2
votes

You would need to show the rest of your (upstream) configuration but, if the inbound request is via a gateway, you would typically throw an exception in the router and handle the error on the gateway's error-channel flow.