0
votes

I am trying to connect to Azure event hub as described here. But getting below error:

Exception in thread "main" java.lang.NoSuchMethodError: reactor.core.publisher.Flux.retryWhen(Ljava/util/function/Function;)Lreactor/core/publisher/Flux;
    at com.azure.core.amqp.implementation.RetryUtil.withRetry(RetryUtil.java:58)
    at com.azure.core.amqp.implementation.ReactorConnection.getClaimsBasedSecurityNode(ReactorConnection.java:142)
    at com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection.createSession(EventHubReactorAmqpConnection.java:148)
    at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$9(ReactorConnection.java:203)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$10(ReactorConnection.java:197)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at com.azure.core.amqp.implementation.AmqpChannelProcessor$ChannelSubscriber.onNext(AmqpChannelProcessor.java:310)
    at com.azure.core.amqp.implementation.AmqpChannelProcessor.lambda$onNext$0(AmqpChannelProcessor.java:87)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at com.azure.core.amqp.implementation.AmqpChannelProcessor.onNext(AmqpChannelProcessor.java:87)
    at reactor.core.publisher.FluxRepeatPredicate$RepeatPredicateSubscriber.onNext(FluxRepeatPredicate.java:85)
    at reactor.core.publisher.Operators$MonoSubscriber.request(Operators.java:1871)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2118)
    at com.azure.core.amqp.implementation.AmqpChannelProcessor.requestUpstream(AmqpChannelProcessor.java:257)
    at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:210)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4046)
    at reactor.core.publisher.Mono.block(Mono.java:1726)
    at com.azure.messaging.eventhubs.EventHubProducerClient.createBatch(EventHubProducerClient.java:127)
    at com.sample.eventhub.eventhub.EventhubDemoApplication.main(EventhubDemoApplication.java:63)

The line at which the error is thrown(EventhubDemoApplication.java:63) has "EventDataBatch batch = producer.createBatch();"

My Pom.xml is as below:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.sample.eventhub</groupId>
    <artifactId>eventhub</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>eventhub-demo</name>
    <description>Demo project for eevnt hub</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-eventhubs</artifactId>
            <version>5.0.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Tried changing the Spring Boot version(2.3.7), then getting the below error:

Exception in thread "restartedMain" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: com.azure.core.amqp.exception.AmqpException: The messaging entity '************************' could not be found. To know more visit https://aka.ms/sbResourceMgrExceptions.  TrackingId:"*********", SystemTracker:**********************, Timestamp:2021-01-11T09:40:57, errorContext[NAMESPACE: ******************, PATH: $cbs, REFERENCE_ID: cbs:receiver, LINK_CREDIT: 0]
    at com.azure.core.amqp.implementation.ExceptionUtil.distinguishNotFound(ExceptionUtil.java:114)
    at com.azure.core.amqp.implementation.ExceptionUtil.amqpResponseCodeToException(ExceptionUtil.java:101)
    at com.azure.core.amqp.implementation.RequestResponseChannel.settleMessage(RequestResponseChannel.java:274)
    at com.azure.core.amqp.implementation.RequestResponseChannel.lambda$new$0(RequestResponseChannel.java:124)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
    at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
    at com.azure.core.amqp.implementation.handler.ReceiveLinkHandler.onDelivery(ReceiveLinkHandler.java:95)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:185)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
    at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:82)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:139)
        at reactor.core.publisher.Mono.block(Mono.java:1709)
        at com.azure.messaging.eventhubs.EventHubProducerClient.createBatch(EventHubProducerClient.java:127)
        at com.example.Sender.SenderApplication.main(SenderApplication.java:30)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)

Tried looking into the answer here, but no luck.

Also, I am trying only send part mentioned in the Microsoft official docs. Is that causing the issue? Any input in this regard would be great.

2

2 Answers

2
votes

The NoSuchMethodError is a result of conflicting dependencies in which it resolved a version of project reactor that does not match the one used in azure-messaging-eventhubs. The problem is that it is resolving "reactor-core" version 3.4.1 where-as azure-messaging-eventhubs uses version 3.3.0.RELEASE. In between these releases, Flux.retryWhen's methods were changed to use Retry class rather than a function and a publisher.

You can see this dependency conflict if you execute:

.\mvnw.cmd org.apache.maven.plugins:maven-dependency-plugin:2.10:tree -Dverbose=true -Dincludes=*reactor*

One solution is to upgrade your dependency to 5.3.1.

<dependency>
  <groupId>com.azure</groupId>
  <artifactId>azure-messaging-eventhubs</artifactId>
  <version>5.3.1</version>
</dependency>

Or, you can downgrade your version of spring-boot to one that works with reactor-core 3.3.0.RELEASE (if you want to keep using azure-messaging-eventhubs 5.0.1). Or explicitly select a version of reactor-core in your pom.xml... Though, you may encounter other NoSuchMethodErrors.

0
votes

Thank you @Connie for your response.

Found that legacy event hub was used. Following this Microsoft doc helped me in connecting to the event hub.