Trying with spring-cloud-stream-binder-kstream 1.3.0.RELEASE version
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<name>kafka-kstream-processor</name>
<description>kafka-kstream-processor demo application</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Dalston.SR3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
<version>1.3.0.RELEASE</version>
<!--<version>1.0.0.BUILD-SNAPSHOT</version>-->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.1.1</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Log trace :
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaProcessorConfiguration' defined in file [/Users/barath/barath-github-projects/spring-cloud-kafka-kstream/kafka-kstream-processor/target/classes/com/barath/app/KafkaProcessorConfiguration.class]: Initialization of bean failed; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor': Invocation of init method failed; nested exception is java.lang.IllegalAccessError: class org.springframework.cloud.stream.binder.kstream.$Proxy93 cannot access its superinterface org.springframework.cloud.stream.binder.kstream.KStreamBoundElementFactory$KStreamWrapper
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:564) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at com.barath.app.KafkaKstreamProcessorApplication.main(KafkaKstreamProcessorApplication.java:15) [classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_101]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_101]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_101]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_101]
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-1.5.7.RELEASE.jar:1.5.7.RELEASE]
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor': Invocation of init method failed; nested exception is java.lang.IllegalAccessError: class org.springframework.cloud.stream.binder.kstream.$Proxy93 cannot access its superinterface org.springframework.cloud.stream.binder.kstream.KStreamBoundElementFactory$KStreamWrapper
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1628) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:372) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1173) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1067) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:513) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1078) ~[spring-context-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.invokeSetupMethodOnListenedChannel(StreamListenerAnnotationBeanPostProcessor.java:274) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.access$100(StreamListenerAnnotationBeanPostProcessor.java:73) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$1.doWith(StreamListenerAnnotationBeanPostProcessor.java:170) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:530) ~[spring-core-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:537) ~[spring-core-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:510) ~[spring-core-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(StreamListenerAnnotationBeanPostProcessor.java:141) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:423) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1633) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
... 20 common frames omitted
Caused by: java.lang.IllegalAccessError: class org.springframework.cloud.stream.binder.kstream.$Proxy93 cannot access its superinterface org.springframework.cloud.stream.binder.kstream.KStreamBoundElementFactory$KStreamWrapper
at java.lang.reflect.Proxy.defineClass0(Native Method) ~[na:1.8.0_101]
at java.lang.reflect.Proxy.access$300(Proxy.java:228) ~[na:1.8.0_101]
at java.lang.reflect.Proxy$ProxyClassFactory.apply(Proxy.java:642) ~[na:1.8.0_101]
at java.lang.reflect.Proxy$ProxyClassFactory.apply(Proxy.java:557) ~[na:1.8.0_101]
at java.lang.reflect.WeakCache$Factory.get(WeakCache.java:230) ~[na:1.8.0_101]
at java.lang.reflect.WeakCache.get(WeakCache.java:127) ~[na:1.8.0_101]
at java.lang.reflect.Proxy.getProxyClass0(Proxy.java:419) ~[na:1.8.0_101]
at java.lang.reflect.Proxy.newProxyInstance(Proxy.java:719) ~[na:1.8.0_101]
at org.springframework.aop.framework.JdkDynamicAopProxy.getProxy(JdkDynamicAopProxy.java:122) ~[spring-aop-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.getProxy(JdkDynamicAopProxy.java:112) ~[spring-aop-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.aop.framework.ProxyFactory.getProxy(ProxyFactory.java:96) ~[spring-aop-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.cloud.stream.binder.kstream.KStreamBoundElementFactory.createOutput(KStreamBoundElementFactory.java:112) ~[spring-cloud-stream-binder-kstream-1.3.0.RELEASE.jar:1.3.0.RELEASE]
at org.springframework.cloud.stream.binder.kstream.KStreamBoundElementFactory.createOutput(KStreamBoundElementFactory.java:52) ~[spring-cloud-stream-binder-kstream-1.3.0.RELEASE.jar:1.3.0.RELEASE]
at org.springframework.cloud.stream.binding.BindableProxyFactory$2.doWith(BindableProxyFactory.java:148) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:530) ~[spring-core-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:510) ~[spring-core-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.cloud.stream.binding.BindableProxyFactory.afterPropertiesSet(BindableProxyFactory.java:134) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1687) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1624) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
... 46 common frames omitted
Configuration Class:
@Configuration
@EnableBinding(KStreamProcessor.class)
public class KafkaProcessorConfiguration {
@StreamListener(value = "input")
@SendTo("output")
public KStream<?, String> splitStrings(KStream<?, String> input) {
input.print();
System.out.println("PROCESSOR WITH INPUT "+input.toString());
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")));
}
}
application.properties:
spring.cloud.stream.bindings.input.destination=kafka-kstream-source-output
spring.cloud.stream.bindings.output.destination=kafka-kstream-processor-output
P.S: Note same thing works with 1.0.0.BUILD-SNAPSHOT as shared in this repo https://github.com/mbogoevici/spring-cloud-stream-binder-kstream
=================================
Update:
Removed devtools dependency as suggested.
Logs:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaProcessorConfiguration' defined in file [/Users/barath/barath-github-projects/spring-cloud-kafka-kstream/kafka-kstream-processor/target/classes/com/barath/app/KafkaProcessorConfiguration.class]: Initialization of bean failed; nested exception is java.lang.AbstractMethodError: org.springframework.cloud.stream.binder.kstream.KStreamStreamListenerResultAdapter.adapt(Ljava/lang/Object;Ljava/lang/Object;)V
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:564) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at com.barath.app.KafkaKstreamProcessorApplication.main(KafkaKstreamProcessorApplication.java:15) [classes/:na]
Caused by: java.lang.AbstractMethodError: org.springframework.cloud.stream.binder.kstream.KStreamStreamListenerResultAdapter.adapt(Ljava/lang/Object;Ljava/lang/Object;)V
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.invokeSetupMethodOnListenedChannel(StreamListenerAnnotationBeanPostProcessor.java:310) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.access$100(StreamListenerAnnotationBeanPostProcessor.java:73) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$1.doWith(StreamListenerAnnotationBeanPostProcessor.java:170) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:530) ~[spring-core-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:537) ~[spring-core-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:510) ~[spring-core-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(StreamListenerAnnotationBeanPostProcessor.java:141) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:423) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1633) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
... 15 common frames omitted
Certainly looks like dependency version mismatch. hence sharing the dependency tree for reference.