2
votes

My TCP Server based on spring integration is working great.

It also handles reply-timeout at the server if the service takes long. When the service takes longer than reply-timeout set in it will send the message to error channel and that in turn sends back error message to the client.

Here is my code:

<int:transformer id="errorHandler"
    input-channel="errorChannel"
    ref="myTransformer" method="transform" />
<bean id="myTransformer" class="com.sample.MyTransformer" />

public class MyTransformer {
    private static Logger logger = Logger.getLogger(MyTransformer.class);

    public String transform(org.springframework.integration.handler.ReplyRequiredException e) {
        logger.error("timeout exception is thrown");
        return "Error in processing request.";
    }
}

The above code works and the client gets 'Error in processing request' and the server log has the entry 'timeout exception is thrown'. But I also see the following exception in the log:

2016-06-30 16:25:27,827 ERROR [org.springframework.integration.handler.LoggingHandler] org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'org.springframework.integration.config.ServiceActivatorFactoryBean#0', and its 'requiresReply' property is set to true.
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42)
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:422)
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:390)
    at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:119)
    at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:97)
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)

Implementation of MyTransformer does not seem to be correct.

Can you please help on how to customize transformer? How to get the payload in the transform method so that I can reply to client as 'Error in processing request. payload = ' + payload?

Thanks

UPDATE: To avoid logging ReplyRequiredException, I changed the error-channel on tcp-inbound-gateway as follows:

<int-ip:tcp-inbound-gateway id="gatewayCrLf"
    connection-factory="crLfServer"
    request-channel="requestChannel"
    error-channel="tcpErrorChannel"
    reply-timeout="10000"
    />
<int:channel id="tcpErrorChannel" /> 

<int:service-activator input-channel="requestChannel" ref="gateway" requires-reply="true"/>
<int:gateway id="gateway" default-request-channel="timeoutChannel" default-reply-timeout="10000" />
<int:object-to-string-transformer id="serverBytes2String"
    input-channel="timeoutChannel"
    output-channel="serviceChannel"/>

<int:channel id="timeoutChannel">
    <int:dispatcher task-executor="timeoutExecutor"/>
</int:channel>

<bean id="timeoutExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="5" />
    <property name="maxPoolSize" value="100" />
    <property name="queueCapacity" value="50" />
</bean>

But I am getting MessageDeliveryException. tcpErrorChannel setup seems to be wrong. Can you please suggest? Here is the Stacktrace:

2016-07-05 12:17:34,266 ERROR [org.springframework.integration.ip.tcp.connection.TcpNetConnection] Exception sending message: GenericMessage [payload=byte[67], headers={timestamp=1467735444239, id=30eb099e-955d-1bd3-1789-49aa9fc84b6f, ip_tcp_remotePort=64055, ip_address=127.0.0.1, ip_localInetAddress=/127.0.0.1, ip_hostname=127.0.0.1, ip_connectionId=127.0.0.1:64055:5678:908d39a1-d027-4753-b144-59b9c0390fd7}]
org.springframework.messaging.MessagingException: failure occurred in error-handling flow; nested exception is org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.context.support.FileSystemXmlApplicationContext@4876db09.tcpErrorChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:452)
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:390)
    at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:119)
    at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:97)
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.context.support.FileSystemXmlApplicationContext@4876db09.tcpErrorChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42)
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:449)
    ... 7 more
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    ... 14 more

The above exception will go away by changing errorChennel to tcpErrorChannel in :

Thanks a lot to Artem for his perfect solution.

1

1 Answers

1
votes

Looks like it is continue of your other question reply-timeout meaning in tcp-inbound-gateway in spring integration.

Well, ReplyRequiredException is as result of the <int:service-activator input-channel="requestChannel" ref="gateway" requires-reply="true"/>.

requires-reply="true" - please, read its description. You can play with that though and make it as false (default one), but then your logic for "timeout" won't work. So, you don't have choice unless continue to live with that:

ERROR [org.springframework.integration.handler.LoggingHandler] org.springframework.integration.handler.ReplyRequiredException

Although, that is just affect of one of subscribers for the default errorChannel: http://docs.spring.io/spring-integration/reference/html/configuration.html#namespace-errorhandler.

Now regarding your last question. All the exception are wrapped into ErrorMessage and sent into that errorChannel. Typically exceptions are wrapped into MessagingException, e.g. in this our case it is like:

else if (this.requiresReply && !isAsync()) {
    throw new ReplyRequiredException(message, "No reply produced by handler '" +
            getComponentName() + "', and its 'requiresReply' property is set to true.");
}

Pay attention into that message ctor argument it is exactly a requestMessage which is without reply in our scenario. And you can extract a desired payload from it.

So, now you can get failedMessage from the ReplyRequiredException in your transform() and extract its payload for your goal.

UPDATE

Since this ReplyRequiredException is a known exception, there is no need to see it as Error.

Well, even it is a known exception, but it is an important one in other components and scenarios when it may not be appropriate do not receive a reply. The ReplyRequiredException is very common through the Framework components, so moving it to different category could be a big architectural error for your application.

To avoid those logs you just should not use a default errorChannel, but some other one from your TCP Inbound Gateway. Something like error-channel="tcpErrorChannel" and that's all. A default errorChannel will still have that LoggingHandler as a subscriber, but you already won't send error messages to that channel.