I have SI flow which consumes org.springframework.web.reactive.socket.WebSocketMessage
, does some work with it, which includes handling it's payload using Netty's ByteBuf
. At some point, an exception occurred in my flow:
org.springframework.messaging.MessageHandlingException: error occurred in message handler [_org.springframework.integration.errorLogger.handler]; nested exception is io.netty.util.IllegalReferenceCountException: refCnt: 0 at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:93) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] ... Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0 at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1356) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] at io.netty.buffer.ByteBufUtil.hashCode(ByteBufUtil.java:175) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] at io.netty.buffer.AbstractByteBuf.hashCode(AbstractByteBuf.java:1315) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] at org.springframework.core.io.buffer.NettyDataBuffer.hashCode(NettyDataBuffer.java:288) ~[spring-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at org.springframework.web.reactive.socket.WebSocketMessage.hashCode(WebSocketMessage.java:134) ~[spring-webflux-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at java.lang.Object.toString(Object.java:236) ~[?:1.8.0_161] at java.lang.String.valueOf(String.java:2994) ~[?:1.8.0_161] at java.lang.StringBuilder.append(StringBuilder.java:131) ~[?:1.8.0_161]
After that, handling all binary web socket messages fails with the following exception:
2018-11-26T10:38:29,133 ERROR --- [-server-epoll-7] o.s.i.h.LoggingHandler (:) org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'binaryWebSocketMessageChannel'; nested exception is java.lang.IllegalStateException: The [binaryWebSocketMessageChannel] doesn't have subscribers to accept messages, failedMessage=GenericMessage [payload=MyPayload(payload=org.springframework.web.reactive.socket.WebSocketMessage@38552d5, session=ReactorNettyWebSocketSession[id=3e0be929, uri=http://localhost:8080/]), headers={id=b09a89ff-f7be-1b43-6f62-40e5c0b5695a, timestamp=1543225109132}] at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:183) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:205) at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:55) at org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:138) at org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:127) at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158) at reactor.core.publisher.FluxRetry$RetrySubscriber.onNext(FluxRetry.java:79) ... Caused by: java.lang.IllegalStateException: The [binaryWebSocketMessageChannel] doesn't have subscribers to accept messages at org.springframework.util.Assert.state(Assert.java:94) at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ... 57 more
Could anyone point me in any direction to try and solve the problem? As well, in which cases would SI EIP component (router, transformer, filter, service activator) to unsubscribe from the channel?
For reference, channel type is org.springframework.integration.channel.FluxMessageChannel
Edit:
My flow looks like this:
WebSocketMessage -> router: (BINARY) -> binaryWebSocketMessageChannel -> ...
(!BINARY) -> nullChannel
(I know filter fits better here, I plan to refactor later)
@ArtemBilan the repo with example is here: https://github.com/ioreskovic/Spring-Integration-flow-loses-subscriber