1
votes

I was trying to create a simple program to print a Kstream from a kafka topic. I am constantly getting an NPE and am completely out of ideas.

I have used spring cloud-stream-binder-kafka-streams dependency and I am using the latest version of spring cloud "Finchley.M9".

The code I have written is:

@Component
@EnableBinding(KafkaStreamsProcessor.class)
public class EventListener{

    @StreamListener("input")
    public void listen(KStream<String,String> kstream){
        kstream.print();
    }
}

The application.properties has:

spring.cloud.stream.bindings.input.destination=slot-events
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.binder.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.binder.valueSerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.applicationId=listener
spring.cloud.stream.kafka.streams.binder.configuration.zookeeper.connect=localhost:2181

When I start the service, I keep getting the following error on the console:

018-03-31 22:57:52.641  INFO 26301 --- [           main] sStreamListenerSetupMethodOrchestrator$1 :  values:
        application.id = default
        application.server =
        bootstrap.servers = [localhost:9092]
        buffered.records.per.partition = 1000
        cache.max.bytes.buffering = 10485760
        client.id =
        commit.interval.ms = 1000
        connections.max.idle.ms = 540000
        default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
        default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
        default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
        default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
        key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        num.standby.replicas = 0
        num.stream.threads = 1
        partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
        poll.ms = 100
        processing.guarantee = at_least_once
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        replication.factor = 1
        request.timeout.ms = 40000
        retry.backoff.ms = 100
        rocksdb.config.setter = null
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        state.cleanup.delay.ms = 600000
        state.dir = /tmp/kafka-streams
        timestamp.extractor = null
        value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
        windowstore.changelog.additional.retention.ms = 86400000
        zookeeper.connect = localhost:2181

2018-03-31 22:57:52.656  INFO 26301 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2018-03-31 22:57:52.685  INFO 26301 --- [           main] ConditionEvaluationReportLoggingListener :

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2018-03-31 22:57:52.693 ERROR 26301 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: java.lang.NullPointerException
        at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.adaptAndRetrieveInboundArguments(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:273) ~[spring-cloud-stream-binder-kafka-streams-2.0.0.RC3.jar!/:2.0.0.RC3]
        at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:154) ~[spring-cloud-stream-binder-kafka-streams-2.0.0.RC3.jar!/:2.0.0.RC3]
        at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:195) ~[spring-cloud-stream-2.0.0.RC3.jar!/:2.0.0.RC3]
        at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:167) ~[spring-cloud-stream-2.0.0.RC3.jar!/:2.0.0.RC3]
        at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_161]
        at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) ~[spring-cloud-stream-2.0.0.RC3.jar!/:2.0.0.RC3]
        at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) ~[spring-cloud-stream-2.0.0.RC3.jar!/:2.0.0.RC3]
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:777) ~[spring-beans-5.0.4.RELEASE.jar!/:5.0.4.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:868) ~[spring-context-5.0.4.RELEASE.jar!/:5.0.4.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.0.4.RELEASE.jar!/:5.0.4.RELEASE]
        at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:388) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1246) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1234) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at dg.athena.sideprojects.kafkastreampoc.KafkastreampocApplication.main(KafkastreampocApplication.java:15) [classes!/:na]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_161]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_161]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_161]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
        at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [kafkastreampoc-0.0.1-SNAPSHOT.jar:na]
        at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [kafkastreampoc-0.0.1-SNAPSHOT.jar:na]
        at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [kafkastreampoc-0.0.1-SNAPSHOT.jar:na]
        at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [kafkastreampoc-0.0.1-SNAPSHOT.jar:na]
Caused by: java.lang.NullPointerException: null
        at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.getkStream(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:294) ~[spring-cloud-stream-binder-kafka-streams-2.0.0.RC3.jar!/:2.0.0.RC3]
        at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.adaptAndRetrieveInboundArguments(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:235) ~[spring-cloud-stream-binder-kafka-streams-2.0.0.RC3.jar!/:2.0.0.RC3]
        ... 24 common frames omitted

Can somebody please suggest?

2

2 Answers

2
votes

this is a bug that got fixed in the latest snapshot. Can you try upgrading the binder to 1.0.0.BUILD-SNAPSHOT and try again?

0
votes

When using Spring Starter or Spring Initializr, including Spring Cloud Stream includes a Gradle (or Maven) dependency

testImplementation 'org.springframework.cloud: spring-cloud-stream-test-support'

Everytime "bootrun" is executed to start the application, an NPE or illegal state exception is generated.

When commenting out the above Gradle dependency, the NPE goes away and everything works fine. Have repeated this on over six different spring boot applications with Spring Cloud Stream and Kafka Streams.

Currently assume everything is ok since code works well with test dependency eliminated. However, never like to proceed without understanding issues (as they can return to haunt later on).

Any thoughts/fixes are appreciated.