1
votes

I'm trying to start SCDF with docker. It starts fine, but when trying to deploy any stream I get the following errors in the app logs

2017-03-09 15:19:45.864  INFO 62 --- [           main] kafka.utils.VerifiableProperties         : Verifying properties
2017-03-09 15:19:45.864  INFO 62 --- [           main] kafka.utils.VerifiableProperties         : Property client.id is overridden to groupid
2017-03-09 15:19:45.866  INFO 62 --- [           main] kafka.utils.VerifiableProperties         : Property metadata.broker.list is overridden to kafka:9092
2017-03-09 15:19:45.867  INFO 62 --- [           main] kafka.utils.VerifiableProperties         : Property request.timeout.ms is overridden to 10000
2017-03-09 15:19:45.868  INFO 62 --- [           main] kafka.client.ClientUtils$                : Fetching metadata from broker id:0,host:kafka,port:9092 with correlation id 0 for 1 topic(s) Set(test1.time)
2017-03-09 15:19:45.868  INFO 62 --- [           main] kafka.producer.SyncProducer              : Connected to kafka:9092 for producing
2017-03-09 15:19:45.872  INFO 62 --- [           main] kafka.producer.SyncProducer              : Disconnecting from kafka:9092
2017-03-09 15:19:45.889 ERROR 62 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder    : Cannot initialize Binder

java.lang.IllegalArgumentException: Broker cannot be null
    at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.integration.kafka.core.BrokerAddress.<init>(BrokerAddress.java:48) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
    at org.springframework.integration.kafka.core.MetadataCache.getLeader(MetadataCache.java:78) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
    at org.springframework.integration.kafka.core.DefaultConnectionFactory$GetBrokersByPartitionFunction.valueOf(DefaultConnectionFactory.java:271) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
    at org.springframework.integration.kafka.core.DefaultConnectionFactory$GetBrokersByPartitionFunction.valueOf(DefaultConnectionFactory.java:266) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
    at com.gs.collections.impl.block.procedure.MapCollectProcedure.value(MapCollectProcedure.java:51) ~[gs-collections-5.0.0.jar!/:na]
    at com.gs.collections.impl.utility.internal.IteratorIterate.forEach(IteratorIterate.java:648) ~[gs-collections-5.0.0.jar!/:na]
    at com.gs.collections.impl.utility.internal.IterableIterate.forEach(IterableIterate.java:481) ~[gs-collections-5.0.0.jar!/:na]
    at com.gs.collections.impl.utility.Iterate.forEach(Iterate.java:126) ~[gs-collections-5.0.0.jar!/:na]
    at com.gs.collections.impl.utility.Iterate.addToMap(Iterate.java:2493) ~[gs-collections-5.0.0.jar!/:na]
    at com.gs.collections.impl.utility.Iterate.toMap(Iterate.java:2467) ~[gs-collections-5.0.0.jar!/:na]
    at org.springframework.integration.kafka.core.DefaultConnectionFactory.getLeaders(DefaultConnectionFactory.java:98) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$3.doWithRetry(KafkaMessageChannelBinder.java:417) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$3.doWithRetry(KafkaMessageChannelBinder.java:405) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263) ~[spring-retry-1.1.2.RELEASE.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154) ~[spring-retry-1.1.2.RELEASE.jar!/:na]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ensureTopicCreated(KafkaMessageChannelBinder.java:405) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doBindProducer(KafkaMessageChannelBinder.java:296) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doBindProducer(KafkaMessageChannelBinder.java:121) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:184) [spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.ChannelBindingService.bindProducer(ChannelBindingService.java:113) [spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:206) [spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:57) [spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:51) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:852) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:541) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:766) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
    at org.springframework.boot.SpringApplication.createAndRefreshContext(SpringApplication.java:361) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1191) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1180) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
    at org.springframework.cloud.stream.app.time.source.kafka.TimeSourceKafkaApplication.main(TimeSourceKafkaApplication.java:29) [time-source-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
    at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:54) [time-source-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

2017-03-09 15:19:45.890  INFO 62 --- [-zookeeper:2181] org.I0Itec.zkclient.ZkEventThread        : Terminate ZkClient event thread.
2017-03-09 15:19:45.892  INFO 62 --- [           main] org.apache.zookeeper.ZooKeeper           : Session: 0x15ab3a17705000a closed
2017-03-09 15:19:45.893  INFO 62 --- [ain-EventThread] org.apache.zookeeper.ClientCnxn          : EventThread shut down
2017-03-09 15:19:45.893  WARN 62 --- [           main] s.c.a.AnnotationConfigApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-03-09 15:19:45.897  INFO 62 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
2017-03-09 15:19:45.899  INFO 62 --- [           main] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@3e47b405: startup date [Thu Mar 09 15:19:36 UTC 2017]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@7c24a24b
2017-03-09 15:19:45.911 ERROR 62 --- [           main] o.s.boot.SpringApplication               : Application startup failed

org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:176) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:51) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:852) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:541) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:766) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
    at org.springframework.boot.SpringApplication.createAndRefreshContext(SpringApplication.java:361) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1191) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1180) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
    at org.springframework.cloud.stream.app.time.source.kafka.TimeSourceKafkaApplication.main(TimeSourceKafkaApplication.java:29) [time-source-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
    at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:54) [time-source-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Caused by: org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ensureTopicCreated(KafkaMessageChannelBinder.java:425) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doBindProducer(KafkaMessageChannelBinder.java:296) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doBindProducer(KafkaMessageChannelBinder.java:121) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:184) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.ChannelBindingService.bindProducer(ChannelBindingService.java:113) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:206) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:57) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    ... 18 common frames omitted
Caused by: java.lang.IllegalArgumentException: Broker cannot be null
    at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
    at org.springframework.integration.kafka.core.BrokerAddress.<init>(BrokerAddress.java:48) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
    at org.springframework.integration.kafka.core.MetadataCache.getLeader(MetadataCache.java:78) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
    at org.springframework.integration.kafka.core.DefaultConnectionFactory$GetBrokersByPartitionFunction.valueOf(DefaultConnectionFactory.java:271) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
    at org.springframework.integration.kafka.core.DefaultConnectionFactory$GetBrokersByPartitionFunction.valueOf(DefaultConnectionFactory.java:266) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
    at com.gs.collections.impl.block.procedure.MapCollectProcedure.value(MapCollectProcedure.java:51) ~[gs-collections-5.0.0.jar!/:na]
    at com.gs.collections.impl.utility.internal.IteratorIterate.forEach(IteratorIterate.java:648) ~[gs-collections-5.0.0.jar!/:na]
    at com.gs.collections.impl.utility.internal.IterableIterate.forEach(IterableIterate.java:481) ~[gs-collections-5.0.0.jar!/:na]
    at com.gs.collections.impl.utility.Iterate.forEach(Iterate.java:126) ~[gs-collections-5.0.0.jar!/:na]
    at com.gs.collections.impl.utility.Iterate.addToMap(Iterate.java:2493) ~[gs-collections-5.0.0.jar!/:na]
    at com.gs.collections.impl.utility.Iterate.toMap(Iterate.java:2467) ~[gs-collections-5.0.0.jar!/:na]
    at org.springframework.integration.kafka.core.DefaultConnectionFactory.getLeaders(DefaultConnectionFactory.java:98) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$3.doWithRetry(KafkaMessageChannelBinder.java:417) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$3.doWithRetry(KafkaMessageChannelBinder.java:405) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263) ~[spring-retry-1.1.2.RELEASE.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154) ~[spring-retry-1.1.2.RELEASE.jar!/:na]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ensureTopicCreated(KafkaMessageChannelBinder.java:405) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
    ... 25 common frames omitted

2017-03-09 15:19:45.915  INFO 62 --- [           main] .b.l.ClasspathLoggingApplicationListener : Application failed to start with classpath:...

I'm using spring-cloud-dataflow-server-local-1.1.4. And in application.properties:

spring.cloud.dataflow.applicationProperties.stream.spring.cloud.zookeeper.connect-string=zookeeper:2181
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=kafka:9092
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=zookeeper:2181
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow=15000

On subsequent runs I also get

Caused by: kafka.common.KafkaException: fetching topic metadata for topics [Set(test2.transform)] from broker [List()] failed
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) ~[kafka_2.10-0.8.2.2.jar!/:na]
    ...
2
Hi, @adrian-ber. This is usually observed when the stream applications are unable to lookup and/or connect with the externally running kafka/zk clusters. You'd be supplying the host/port specs to the SCDF-server and that gets propagated to all the apps eventually - see here. Let's start with your setup. Which SCDF-server implementation are you using? Version? How are you configuring kafka/zk settings?Sabby Anandan
I updated the questionAdrian Ber

2 Answers

0
votes

Your application properties look good for connecting. You are using kafka and zookeeper to serve as message pipelines for particular Topic. The message comes with properties that you need to configure, either in docker-compose file(if exists) or in application properties.

For Example, if your aim is for service to act as producer of the message. The message should be configured as topic and topic name, having group if any. This can also be due to incorrect proxy settings in which your topic is not read by application.

Following is kafka topic declaration for buy sell scenario

spring.cloud.stream.kafka.bindings.costPrice.destination:abc.company.costPrice

spring.cloud.stream.kafka.bindings.costPrice.content-type:application/json

spring.cloud.stream.kafka.bindings.sellPrice.destination:abc.company.sellPrice

spring.cloud.stream.kafka.bindings.sellPrice.content-type:application/json
spring.cloud.stream.kafka.bindings.sellPrice.group:sellPriceGroup

The Topic needs to be configured as well. Also See the network ports whether you are able to get zookeeper and kafka running. Also add autoCreateTopics:true option.

I am unaware of your current setup, if in case you are starting zookeeper and kafka properly, before your application.properties is read. you might as well replace zookeeper:2181 with machine-hostname:2181 , and same for kafka. (eg. pavan.network.co.in:2181)

Hope it helps.

0
votes

I don't know why but replacing docker images zookeeper and wurstmeister/kafka:latest with spotify/kafka (containing both Kafka and Zookeeper) made the entire setup work.