1
votes

I am developing a REST API which is reactive, using Reactive Mongo drivers, Spring Webflux framework.

All works fine when the concurrency level is below 100, however when I increase the concurrency level to 200, blockhound reports blocking calls to Mongo.

I am using Mongod community version on windows, with Blockhound installed in Spring boot application and Apache bench to test the concurrent behaviour.

Below is the error. This error is reported only when concurrency above 100.

Any suggestions on how to resolve this error?

Please note I have reported this issue with Mongodb community as well.

They have informed that it may be a false positive as system may not have enough resources to create more connections. If there are no resources to create more connections, should Blockhound report as blocking?

2021-01-02 09:52:17.399 ERROR 18688 --- [reactor-http-nio-2] org.mongodb.driver.operation             
: Callback onResult call produced an error.

reactor.blockhound.BlockingOperationError: Blocking call! jdk.internal.misc.Unsafe#park
    at java.base/jdk.internal.misc.Unsafe.park(Unsafe.java)
    at java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917)
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240)
    at java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267)
    at java.base/java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:409)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1347)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
    at java.base/java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:714)
    at com.mongodb.internal.connection.DefaultConnectionPool.getAsync(DefaultConnectionPool.java:157)
    at com.mongodb.internal.connection.DefaultServer.getConnectionAsync(DefaultServer.java:105)
    at com.mongodb.internal.binding.AsyncClusterBinding$AsyncClusterBindingConnectionSource.getConnection(AsyncClusterBinding.java:131)
    at com.mongodb.internal.async.client.ClientSessionBinding$SessionBindingAsyncConnectionSource.getConnection(ClientSessionBinding.java:140)
    at com.mongodb.internal.operation.OperationHelper.withAsyncConnectionSource(OperationHelper.java:730)
    at com.mongodb.internal.operation.OperationHelper.access$200(OperationHelper.java:68)
    at com.mongodb.internal.operation.OperationHelper$AsyncCallableWithConnectionAndSourceCallback.onResult(OperationHelper.java:750)
    at com.mongodb.internal.operation.OperationHelper$AsyncCallableWithConnectionAndSourceCallback.onResult(OperationHelper.java:738)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
    at com.mongodb.internal.async.client.ClientSessionBinding$WrappingCallback.onResult(ClientSessionBinding.java:208)
    at com.mongodb.internal.async.client.ClientSessionBinding$WrappingCallback.onResult(ClientSessionBinding.java:196)
    at com.mongodb.internal.binding.AsyncClusterBinding$1.onResult(AsyncClusterBinding.java:105)
    at com.mongodb.internal.binding.AsyncClusterBinding$1.onResult(AsyncClusterBinding.java:99)
    at com.mongodb.internal.connection.BaseCluster$ServerSelectionRequest.onResult(BaseCluster.java:432)
    at com.mongodb.internal.connection.BaseCluster.handleServerSelectionRequest(BaseCluster.java:299)
    at com.mongodb.internal.connection.BaseCluster.selectServerAsync(BaseCluster.java:155)
    at com.mongodb.internal.connection.SingleServerCluster.selectServerAsync(SingleServerCluster.java:42)
    at com.mongodb.internal.binding.AsyncClusterBinding.getAsyncClusterBindingConnectionSource(AsyncClusterBinding.java:99)
    at com.mongodb.internal.binding.AsyncClusterBinding.getReadConnectionSource(AsyncClusterBinding.java:84)
    at com.mongodb.internal.async.client.ClientSessionBinding.getReadConnectionSource(ClientSessionBinding.java:58)
    at com.mongodb.internal.operation.OperationHelper.withAsyncReadConnection(OperationHelper.java:677)
    at com.mongodb.internal.operation.FindOperation.executeAsync(FindOperation.java:689)
    at com.mongodb.internal.async.client.OperationExecutorImpl$1$1.onResult(OperationExecutorImpl.java:86)
    at com.mongodb.internal.async.client.OperationExecutorImpl$1$1.onResult(OperationExecutorImpl.java:74)
    at com.mongodb.internal.async.client.OperationExecutorImpl.getReadWriteBinding(OperationExecutorImpl.java:177)
    at com.mongodb.internal.async.client.OperationExecutorImpl.access$200(OperationExecutorImpl.java:43)
    at com.mongodb.internal.async.client.OperationExecutorImpl$1.onResult(OperationExecutorImpl.java:72)
    at com.mongodb.internal.async.client.OperationExecutorImpl$1.onResult(OperationExecutorImpl.java:66)
    at com.mongodb.internal.async.client.ClientSessionHelper.createClientSession(ClientSessionHelper.java:60)
    at com.mongodb.internal.async.client.ClientSessionHelper.withClientSession(ClientSessionHelper.java:51)
    at com.mongodb.internal.async.client.OperationExecutorImpl.execute(OperationExecutorImpl.java:66)
    at com.mongodb.internal.async.client.AsyncMongoIterableImpl.batchCursor(AsyncMongoIterableImpl.java:167)
    at com.mongodb.reactivestreams.client.internal.MongoIterableSubscription.requestInitialData(MongoIterableSubscription.java:45)
    at com.mongodb.reactivestreams.client.internal.AbstractSubscription.tryRequestInitialData(AbstractSubscription.java:177)
    at com.mongodb.reactivestreams.client.internal.AbstractSubscription.request(AbstractSubscription.java:100)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:235)
    at com.mongodb.reactivestreams.client.internal.MongoIterableSubscription.<init>(MongoIterableSubscription.java:39)
    at com.mongodb.reactivestreams.client.internal.Publishers.lambda$publish$0(Publishers.java:43)
    at com.mongodb.reactivestreams.client.internal.FindPublisherImpl.subscribe(FindPublisherImpl.java:175)
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:66)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:61)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4046)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157)
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:259)
    at reactor.core.publisher.Operators.error(Operators.java:196)
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:52)
    at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199)
    at reactor.core.publisher.MonoFlatMapMany.subscribeOrReturn(MonoFlatMapMany.java:49)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8133)
    at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:93)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:305)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251)
    at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:99)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:383)
    at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:396)
    at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:540)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:252)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)
1

1 Answers

1
votes

There is a common exception that mongo may kick out when its own query request queue gets too large. This happens because webflux is efficient at passing through requests and the mongo driver chokes, on the database side. The back pressure isn't handled across the connection.

Not sure without seeing code but that could be one thing to investigate.

You may also be making a simple mistake in code and actually calling blocking code in your web handlers.