1
votes

I created a server that returns an infinite Flux and a client that reads objects from the response asynchronously. I want the client to unsubscribe from the Flux and stop processing it.

The server's controller:

@GetMapping(path = "/infinite", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<String> getStreamOfLongs() {
    return Flux.generate(sink -> sink.next("x"));
}

The client:

WebClient client = WebClient.create("http://localhost:8080");
    Flux<String> flux = client.get()
            .uri("/infinite")
            .accept(TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(String.class);
    Disposable disposable = flux.subscribe(consumer);
    Executors.newSingleThreadScheduledExecutor().schedule(() -> disposable.dispose(), 5, TimeUnit.SECONDS);

Is this the right way to unsubscribe from the stream?
What need the client do when it "wants" to stop reading more data?

When the client unsubscribes (with disposable.dispose()) 2 exceptions are thrown in the server side (IOException and UnsupportedOperationException):

java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method) ~[na:1.8.0_131]
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) ~[na:1.8.0_131]
at sun.nio.ch.IOUtil.write(IOUtil.java:148) ~[na:1.8.0_131]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) ~ [na:1.8.0_131]
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at reactor.ipc.netty.channel.ChannelOperationsHandler$PublisherSender.onComplete(ChannelOperationsHandler.java:505) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:130) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:184) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:6516) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:433) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:179) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at reactor.ipc.netty.NettyOutbound.lambda$sendObject$6(NettyOutbound.java:298) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE]
at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:106) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:2913) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.ipc.netty.NettyOutbound.subscribe(NettyOutbound.java:356) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE]
at reactor.core.publisher.FluxConcatMap$ConcatMapDelayed.drain(FluxConcatMap.java:744) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxConcatMap$ConcatMapDelayed.onNext(FluxConcatMap.java:581) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:150) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxGenerate$GenerateSubscription.next(FluxGenerate.java:164) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at io.github.msayag.webflux.MyController.lambda$getStreamOfLongs$0(MyController.java:44) ~[classes/:na]
...
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) ~[netty-common-4.1.16.Final.jar:4.1.16.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]

followed by

2017-11-24 01:04:09.476 ERROR 83663 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter : Failed to handle request

java.lang.UnsupportedOperationException: null
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) ~[na:1.8.0_131]
at org.springframework.http.HttpHeaders.set(HttpHeaders.java:1439) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.http.HttpHeaders.setContentType(HttpHeaders.java:849) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler.write(AbstractErrorWebExceptionHandler.java:235) ~[spring-boot-autoconfigure-2.0.0.M6.jar:2.0.0.M6]
at org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler.lambda$handle$1(AbstractErrorWebExceptionHandler.java:228) ~[spring-boot-autoconfigure-2.0.0.M6.jar:2.0.0.M6]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1092) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1649) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:230) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:172) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1649) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1463) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1337) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:2913) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Operators.complete(Operators.java:125) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:2913) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:185) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:251) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Operators.error(Operators.java:175) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:129) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:2913) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at org.springframework.http.server.reactive.ChannelSendOperator$WriteCompletionBarrier.onError(ChannelSendOperator.java:339) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:87) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1332) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1135) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onError(MonoIgnoreThen.java:300) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:75) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxConcatMap$ConcatMapDelayed.drain(FluxConcatMap.java:660) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxConcatMap$ConcatMapDelayed.onNext(FluxConcatMap.java:581) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:150) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxGenerate$GenerateSubscription.next(FluxGenerate.java:164) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at io.github.msayag.webflux.MyController.lambda$getStreamOfLongs$0(MyController.java:44) ~[classes/:na]
...
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) ~[netty-common-4.1.16.Final.jar:4.1.16.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]

1

1 Answers

2
votes

You're doing this right as far as I know.

Disposable::dispose effectively cancels the stream, which is what you should do from a Subscriber point of view when you're not interested in receiving data anymore.

Calling that on the WebClient side will result in closing the HTTP connection. I don't think there's a "cleaner" way of telling the server you don't want to receive data anymore. With HTTP/2, things might be different as HTTP streams can be closed without closing the whole connection.

From the server's point of view, a client cancelling voluntarily looks the same as a client closing the connection because of an error. So the exceptions both signal that

  1. the connection was closed while the server was trying to write to it
  2. the response hasn't been handled properly (the server still had things to write)

If you have enhancement ideas about this behaviour, please create a ticket on https://jira.spring.io