2
votes

I am trying to get all cf applications using java reactor framework

Flux<ApplicationSummary> appFlux= _cloudFoundryOperations.applications().list();
List<ApplicationSummary> result = appFlux.collectList().block();

This call is being made every 5 minutes to get the applications deployed recently.

it works well for few couple of hours, later it returns "java.lang.OutOfMemoryError". The stack trace is below, How can I do this call efficiently?

"stacktrace": [
  "java.lang.OutOfMemoryError: Direct buffer memory",
  "\tat java.nio.Bits.reserveMemory(Bits.java:711)",
  "\tSuppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nAssembly trace from producer [reactor.core.publisher.MonoFlatMap] :\n\treactor.core.publisher.Mono.checkpoint(Mono.java:1877)\n\torg.cloudfoundry.reactor.client.v2.organizations.ReactorOrganizations.list(ReactorOrganizations.java:202)\nError has been observed at the following site(s):\n\t|_ Mono.checkpoint ⇢ at org.cloudfoundry.reactor.client.v2.organizations.ReactorOrganizations.list(ReactorOrganizations.java:202)\n\t|_ Flux.checkpoint ⇢ at org.cloudfoundry.operations.applications.DefaultApplications.list(DefaultApplications.java:330)\nStack trace:",
  "\t\tat java.nio.Bits.reserveMemory(Bits.java:711)",
  "\t\tat java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)",
  "\t\tat java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)",
  "\t\tat io.netty.channel.unix.Buffer.allocateDirectWithNativeOrder(Buffer.java:40)",
  "\t\tat io.netty.channel.epoll.EpollEventArray.<init>(EpollEventArray.java:56)",
  "\t\tat io.netty.channel.epoll.EpollEventLoop.<init>(EpollEventLoop.java:95)",
  "\t\tat io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:151)",
  "\t\tat io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:35)",
  "\t\tat io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)",
  "\t\tat io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)",
  "\t\tat io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)",
  "\t\tat io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)",
  "\t\tat io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:112)",
  "\t\tat io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:99)",
  "\t\tat io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:76)",
  "\t\tat reactor.netty.resources.DefaultLoopEpoll.newEventLoopGroup(DefaultLoopEpoll.java:64)",
  "\t\tat reactor.netty.resources.DefaultLoopResources.cacheNativeServerLoops(DefaultLoopResources.java:252)",
  "\t\tat reactor.netty.resources.DefaultLoopResources.cacheNativeClientLoops(DefaultLoopResources.java:267)",
  "\t\tat reactor.netty.resources.DefaultLoopResources.onClient(DefaultLoopResources.java:199)",
  "\t\tat reactor.netty.tcp.TcpClientRunOn.configure(TcpClientRunOn.java:51)",
  "\t\tat reactor.netty.tcp.TcpClientRunOn.configure(TcpClientRunOn.java:43)",
  "\t\tat reactor.netty.tcp.TcpClientBootstrap.configure(TcpClientBootstrap.java:39)",
  "\t\tat reactor.netty.tcp.TcpClientBootstrap.configure(TcpClientBootstrap.java:39)",
  "\t\tat reactor.netty.tcp.TcpClientSecure.configure(TcpClientSecure.java:53)",
  "\t\tat reactor.netty.tcp.TcpClientDoOn.configure(TcpClientDoOn.java:48)",
  "\t\tat reactor.netty.tcp.TcpClient.connect(TcpClient.java:196)",
  "\t\tat org.cloudfoundry.reactor.util.DefaultSslCertificateTruster.getUntrustedCertificates(DefaultSslCertificateTruster.java:166)",
  "\t\tat org.cloudfoundry.reactor.util.DefaultSslCertificateTruster.trust(DefaultSslCertificateTruster.java:91)",
  "\t\tat org.cloudfoundry.reactor._DefaultConnectionContext.lambda$trust$1(_DefaultConnectionContext.java:155)",
  "\t\tat java.util.Optional.map(Optional.java:215)",
  "\t\tat org.cloudfoundry.reactor._DefaultConnectionContext.trust(_DefaultConnectionContext.java:155)",
  "\t\tat org.cloudfoundry.reactor.DefaultConnectionContext.trust(DefaultConnectionContext.java:23)",
  "\t\tat org.cloudfoundry.reactor.AbstractRootProvider.trust(AbstractRootProvider.java:136)",
  "\t\tat org.cloudfoundry.reactor.AbstractRootProvider.lambda$getRoot$3(AbstractRootProvider.java:70)",
  "\t\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.subscribeNextTrigger(MonoDelayUntil.java:210)",
  "\t\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.onNext(MonoDelayUntil.java:169)",
  "\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)",
  "\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)",
  "\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2317)",
  "\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)",
  "\t\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2125)",
  "\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:68)",
  "\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)",
  "\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)",
  "\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4218)",
  "\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97)",
  "\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134)",
  "\t\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1994)",
  "\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134)",
  "\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134)",
  "\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:165)",
  "\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)",
  "\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)",
  "\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)",
  "\t\tat reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1802)",
  "\t\tat reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:323)",
  "\t\tat reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onError(MonoCacheTime.java:346)",
  "\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)",
  "\t\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.onError(MonoDelayUntil.java:175)",
  "\t\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.subscribeNextTrigger(MonoDelayUntil.java:213)",
  "\t\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.onNext(MonoDelayUntil.java:169)",
  "\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2317)",
  "\t\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.onSubscribe(MonoDelayUntil.java:159)",
  "\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)",
  "\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)",
  "\t\tat reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:132)",
  "\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)",
  "\t\tat reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:132)",
  "\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8311)",
  "\t\tat reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199)",
  "\t\tat reactor.core.publisher.MonoFlatMapMany.subscribeOrReturn(MonoFlatMapMany.java:49)",
  "\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)",
  "\t\tat reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:132)",
  "\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)",
  "\t\tat reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:132)",
  "\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4203)",
  "\t\tat reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)",
  "\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4218)",
  "\t\tat reactor.core.publisher.Mono.block(Mono.java:1677)",
  "\t\tat com.sap.crun.healthapi.cf.CfApi.getApplications(CfApi.java:124)",
  "\t\tat com.sap.crun.healthapi.cf.CfApi.getCloudFoundryMetrics(CfApi.java:92)",
  "\t\tat com.sap.crun.healthapi.cf.GetCloudFoundryMetricsCommand.getCloudFoundryMetrics(GetCloudFoundryMetricsCommand.java:35)",
  "\t\tat com.sap.crun.healthapi.services.healthdata.HealthServiceClient.readCloudFoundryMetrics(HealthServiceClient.java:170)",
  "\t\tat com.sap.crun.healthapi.services.healthdata.FetchHealthDataCommand.processLandscapeHealthData(FetchHealthDataCommand.java:51)",
  "\t\tat com.sap.crun.healthapi.services.healthdata.HealthFactsImporter.run(HealthFactsImporter.java:39)",
  "\t\tat com.sap.crun.healthapi.services.healthdata.HealthFactsImporterAsJob.lambda$run$0(HealthFactsImporterAsJob.java:46)",
  "\t\tat com.sap.cds.services.impl.runtime.CdsRuntimeImpl.runInRequestContext(CdsRuntimeImpl.java:154)",
  "\t\tat com.sap.crun.healthapi.services.healthdata.HealthFactsImporterAsJob.run(HealthFactsImporterAsJob.java:44)",
  "\t\tat com.sap.crun.healthapi.services.healthdata.HealthFactsImporterAsJob$$FastClassBySpringCGLIB$$211f9f83.invoke(<generated>)",
  "\t\tat org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)",
  "\t\tat org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)",
  "\t\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)",
  "\t\tat org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)",
  "\t\tat org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)",
  "\t\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)",
  "\t\tat com.sap.dwc.util.headers.DwcContextTaskDecorator.lambda$decorate$0(DwcContextTaskDecorator.java:33)",
  "\t\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)",
  "\t\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)",
  "\t\tat java.lang.Thread.run(Thread.java:836)"
]

Could someone help here?

1
What are you doing with the collected application list? Do you need to hold all of them in memory?Martin Tarjányi
i do not need to hold it in the memory. Once I receive the application list, I will persist in my own table. .Benu Mariantony

1 Answers

0
votes
Flux<ApplicationSummary> appFlux= _cloudFoundryOperations.applications().list();
List<ApplicationSummary> result = appFlux.collectList().block();

Without knowing more context directly, this looks like an awful lot like it will get a list of all applications. This is contrary to what you state you require:

This call is being made every 5 minutes to get the applications deployed recently.

...and this is likely why you're also running out of memory, as the list likely simply gets way too big to fit in memory all at once. If you genuinely need this complete list every 5 minutes and it won't fit into memory, then there's nothing you can do - you either need to re-architect your application, or add more memory (though the latter is likely not a feasible option after a certain point!)

You don't go into detail about exactly what you're trying to achieve here, but there's two options that may well fit your use case:

  • Ideally, you wouldn't block at all - but if you just need 5 minutes worth of values processed as a list every 5 minutes, then you can simply window the flux in that timespan then do as you please with the result:
_cloudFoundryOperations.applications().list()
        .window(Duration.ofMinutes(5))
        .flatMap(Flux::collectList)
        .doOnNext(list -> {
            //process list
        })
  • As the "least change" approach If you want to block each time as you're doing now, but want to make sure you don't run out of memory, you can use takeLast() with some kind of sensible value to get the last n elements (which will work if you're happy with the last n applications deployed):
_cloudFoundryOperations.applications().list()
        .takeLast(10_000)
        .collectList().block()

...however, remember that blocking generally defeats the purpose of using reactor (unless you're specifically doing it as part of an ongoing migration or because there's no other choice) - so my best advice would be to try to engineer all blocking calls out of your system entirely.