I have a requirement where i am (my application) receiving commands (string values) in a Spring Integration channel through TCP connections. When I receive a command, the application sends it over to a router to route the request to different channels in order to activate services using service activators.
The router first validates the command. If the command is not valid the router should not allow further processing and a custom error message needs to be sent back to the sender (TCP Client). I want a solution where I can analyze my command before or during routing and send the error response without any further processing ?
Following are the configurations and code.
Spring Integration Config
<ip:tcp-connection-factory id="server"
type="server"
port="${port}"
using-nio="true"
serializer="stxEtxSerializer"
deserializer="stxEtxSerializer" />
<ip:tcp-inbound-channel-adapter id="inAdapter.server"
channel="adiIncomingChannel"
connection-factory="server" />
<channel id="adiIncomingChannel" datatype="java.lang.String" />
<channel id="toSA" datatype="java.lang.String" />
<service-activator input-channel="toSA"
ref="tcpMessageReceiver"
method="receiveTcpMessageBytes" />
<beans:bean id="tcpMessageReceiver"
class="com.gcaa.adi.tcp.socket.TcpMessageReceiver" />
<channel id="toObAdapter" />
<ip:tcp-outbound-channel-adapter id="outAdapter.server"
channel="toObAdapter"
connection-factory="server" />
<router input-channel="adiIncomingChannel" ref="cmdRouter"></router>
<beans:bean id="cmdRouter" class="com.gcaa.adi.messaging.router.CommandRouter">
</beans:bean>
Router Class
public class CommandRouter{
public String contentRouter(byte[] cmdBytes){
String channelName = null;
String cmd = new String(cmdBytes);
//Data Requests
if(cmd.startsWith(AdiCommandType.DATA_REQUEST.getCode())){
AdiCommandValidationModel validCmdModel = validateDataCommand(cmd);
if(! validCmdModel.isValidCmd()){
// TODO: Send the error to requesting client. Error = "<ERROR::<DATA::REQUEST>::001>"
}
channelName = "toSA";
} // Health Check Response
else if(cmd.startsWith("<HEALTH::ACK>")){
channelName = "toSA";
}
return channelName;
}
private AdiCommandValidationModel validateDataCommand(String cmd){
AdiCommandValidationModel validModel = new AdiCommandValidationModel();
validModel.setCmd(cmd);
List<MatchedGroup> matchedCmdItems = GcaaUtil.matchByGroups("^<DATA::(\\d[1-31]-\\d[1-12]-20\\d\\d)::(\\d+)>", cmd);
if(null != matchedCmdItems && matchedCmdItems.size() > 0){
// 1. Check if request date is valid
if(!GcaaUtil.isEmptyString(matchedCmdItems.get(0).getGroup())){
boolean isValidCmd = GcaaUtil.isDateValid(matchedCmdItems.get(0).getGroup(), "DD-MM-YYYY");
validModel.setValidCmd(isValidCmd);
}else{
validModel.setValidCmd(false);
}
// return if date is not valid
if(! validModel.isValidCmd()){
validModel.setErrorCode(AdiErrorCode.DATA_INVALID_DATE);
return validModel;
}
// 2. Check that DI value is >=0. It should not be less than 0 or NULL
if(!GcaaUtil.isEmptyString(matchedCmdItems.get(1).getGroup())){
Integer di = null;
try{
di = Integer.parseInt(matchedCmdItems.get(1).getGroup());
}catch(NumberFormatException nfe){
}
if(di== null || di < 0){validModel.setErrorCode(AdiErrorCode.DATA_INVALID_DATE);
return validModel;}
}else{
validModel.setErrorCode(AdiErrorCode.INVALID_COMMAND);
return validModel;
}
}
if(! validModel.isValidCmd()){
validModel.setErrorCode(AdiErrorCode.INVALID_COMMAND);
return validModel;
}
return validModel;
}
}
Updated - Solution 2
I have tried the solution referred by you in this this link but I am getting an error pasted below.
Updated Service Activator code is listed below. I have removed the validation from the Router class and moved it to ServiceActivator.
Validation ServiceActivator:
public void validate(Message<byte[]> msg){
String channelName = null;
String cmd = new String(msg.getPayload());
AdiCommandValidationModel validCmdModel = validateDataCommand(cmd);
if(! validCmdModel.isValidCmd()){
// TODO: Send to error channel
String err = "<ERROR::"+validCmdModel.getCmd()+"::"+validCmdModel.getErrorCode().getCode()+">";
Message<String> mb = MessageBuilder.createMessage(err, msg.getHeaders());
throw new MessagingException(mb);
}
//Data Requests
if(cmd.startsWith(AdiCommandType.DATA_REQUEST.getCode())){
validCmdModel = validateDataCommand(cmd);
if(! validCmdModel.isValidCmd()){
// TODO: Send to error channel
throw new MessagingException(new GenericMessage<String>(validCmdModel.getValdationMsg()));
}
channelName = "toSA";
} // Health Check Response
else if(cmd.startsWith("<HEALTH::ACK>")){
channelName = "toSA";
}
}
Modified Config for Validation ServiceActivator
<!-- Server side -->
<ip:tcp-connection-factory id="server"
type="server"
port="${port}"
using-nio="true"
serializer="stxEtxSerializer"
deserializer="stxEtxSerializer" />
<ip:tcp-inbound-channel-adapter id="inAdapter.server"
channel="adiIncomingChannel"
connection-factory="server"
error-channel="inputErrorChannel"/>
<!-- <object-to-string-transformer input-channel="adiIncomingChannel" output-channel="adiIncomingChannelStr"/> -->
<service-activator input-channel="adiIncomingChannel"
ref="adiInputValidator"
method="validate" />
<channel id="adiIncomingChannel"/>
<!-- <channel id="adiIncomingChannel" datatype="java.lang.String" /> -->
<channel id="inputErrorChannel" />
<channel id="toObAdapter" />
<ip:tcp-outbound-channel-adapter id="outAdapter.server"
channel="toObAdapter"
connection-factory="server" />
<transformer input-channel="inputErrorChannel"
output-channel="toObAdapter"
expression="payload.failedMessage.payload"/>
<!-- dataType attribute invokes the conversion service -->
<channel id="toSA" datatype="java.lang.String" />
<service-activator input-channel="toSA"
ref="tcpMessageReceiver"
method="receiveTcpMessageBytes" />
<!-- output-channel="toObAdapter" -->
<beans:bean id="tcpMessageReceiver"
class="com.gcaa.adi.tcp.socket.TcpMessageReceiver" />
<beans:bean id="adiInputValidator"
class="com.gcaa.adi.validation.AdiInputValidator" />
Exception from solution 2 ---***------------------* IN
TcpConnectionFailedCorrelation -*-*-*-*-*-*-*-*-*-*-**-*
Connection Correlation ID: null
Connection Source: org.springframework.integration.ip.tcp.TcpSendingMessageHandler#0
Connection time: 1486465992372
Cause = : org.springframework.messaging.MessageHandlingException: Unable to find outbound socket
2017-02-07 15:13:13.183 ERROR 4444 --- [pool-2-thread-3] o.s.i.i.tcp.connection.TcpNioConnection : Exception sending message: GenericMessage [payload=byte[10], headers={ip_tcp_remotePort=59521, ip_connectionId=127.0.0.1:59521:5677:9a7fd07e-716b-4d1c-8be6-c73223cea9db, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=ca87761a-4a39-5869-ad53-7e078cf40508, ip_hostname=127.0.0.1, timestamp=1486465989192}]
org.springframework.messaging.MessageHandlingException: Unable to find outbound socket
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:123) ~[spring-integration-ip-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:176) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter.onMessage(TcpReceivingChannelAdapter.java:88) ~[spring-integration-ip-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.ip.tcp.connection.TcpNioConnection.sendToChannel(TcpNioConnection.java:371) [spring-integration-ip-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.integration.ip.tcp.connection.TcpNioConnection.run(TcpNioConnection.java:240) [spring-integration-ip-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_74]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_74]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_74]