1
votes

I'm trying to run a simple Spring Boot Kafka application but I can't make it work. I've followed various tutorials, now I'm implementing this one, but when I start the application this is what happens:

enter image description here

I can write in the console, but the consumer doesn't receive any message.
This is my SpringApplication class:

@SpringBootApplication(scanBasePackages = "com.springmiddleware")
@ComponentScan("com.springmiddleware")
@EnableAutoConfiguration
@EntityScan("com.springmiddleware")
public class SpringMiddlewareApplication implements CommandLineRunner{



    public static void main(String[] args) throws Exception {

        SpringApplication.run(SpringMiddlewareApplication.class, args);

    }

    @Autowired
    private Producer sender;

    @Override 
    public void run (String... strings) {
        sender.send("Hello world");
    }

}

application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:8080

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.memorynotfound: DEBUG

Consumer class, Producer Class and their configurations' class are the same as written in the tutorial.
In my server.properties file I have:

zookeeper.connect=localhost:8080

and in zookeeper.properties:

clientPort=8080

Same port specified in application.yml. Before starting the application, I run

.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

and

.\bin\windows\kafka-server-start.bat config\server.properties

UPDATE

This is the ReceiverConfig class:

@EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

This is the SenderConfig class:

    @Configuration
public class SenderConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

And this is the method listen that is in the Consumer class

@KafkaListener(topics = "${app.topic.foo}")
    public void listen(@Payload String message) {
        System.out.println("Received " + message);
    }

Producer class:

@Service
public class Producer {

     @Autowired
     private KafkaTemplate<String, String> kafkaTemplate;

     @Value("${app.topic.foo}")
        private String topic;

     public void send(String message){
            kafkaTemplate.send(topic, message);
        }
}

UPDATE 2

[2019-04-01 17:23:52,492] INFO Established session 0x100435950880000 with negotiated timeout 6000 for client /0:0:0:0:0:0:0:1:60079 (org.apache.zookeeper.server.ZooKeeperServer) [2019-04-01 17:23:52,539] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0x1 zxid:0xef txntype:-1 reqpath:n/a Error Path:/consumers Error:KeeperErrorCode = NodeExists for /consumers (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,555] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0x2 zxid:0xf0 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,555] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0x3 zxid:0xf1 txntype:-1 reqpath:n/a Error Path:/brokers/topics Error:KeeperErrorCode = NodeExists for /brokers/topics (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,555] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0x4 zxid:0xf2 txntype:-1 reqpath:n/a Error Path:/config/changes Error:KeeperErrorCode = NodeExists for /config/changes (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,570] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0x5 zxid:0xf3 txntype:-1 reqpath:n/a Error Path:/admin/delete_topics Error:KeeperErrorCode = NodeExists for /admin/delete_topics (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,570] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0x6 zxid:0xf4 txntype:-1 reqpath:n/a Error Path:/brokers/seqid Error:KeeperErrorCode = NodeExists for /brokers/seqid (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,586] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0x7 zxid:0xf5 txntype:-1 reqpath:n/a Error Path:/isr_change_notification Error:KeeperErrorCode = NodeExists for /isr_change_notification (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,586] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0x8 zxid:0xf6 txntype:-1 reqpath:n/a Error Path:/latest_producer_id_block Error:KeeperErrorCode = NodeExists for /latest_producer_id_block (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,586] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0x9 zxid:0xf7 txntype:-1 reqpath:n/a Error Path:/log_dir_event_notification Error:KeeperErrorCode = NodeExists for /log_dir_event_notification (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,602] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0xa zxid:0xf8 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,602] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0xb zxid:0xf9 txntype:-1 reqpath:n/a Error Path:/config/clients Error:KeeperErrorCode = NodeExists for /config/clients (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,617] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0xc zxid:0xfa txntype:-1 reqpath:n/a Error Path:/config/users Error:KeeperErrorCode = NodeExists for /config/users (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,617] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:create cxid:0xd zxid:0xfb txntype:-1 reqpath:n/a Error Path:/config/brokers Error:KeeperErrorCode = NodeExists for /config/brokers (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:53,564] INFO Got user-level KeeperException when processing sessionid:0x100435950880000 type:multi cxid:0x3a zxid:0xff txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)

1
show the consumer config and producer configs class with listener methodDeadpool
Updated with the classesUsr
@Deadpool mate that line is correct. KafkaTemplate is used within the Producer class's method.Madhu Bhat
Can you please add the Producer class?Madhu Bhat
@MadhuBhat added the producer class in questionUsr

1 Answers

3
votes

In your application.yml, you have specified the zookeeper port instead of the kafka broker port

spring:
  kafka:
    bootstrap-servers: localhost:8080

In the above, you should be defining the port of the kafka broker, ie the value of port= of the server.properties file.

A Spring boot app runs by default on port 8080, so please don't use the same for Zookeeper port, unless you have changed the default port of the Spring boot app.

So in the server.properties, have port=9092 and zookeeper.connect=localhost:2181, and in the application.yml, have as below:

spring:
  kafka:
    bootstrap-servers: localhost:9092

Then in the zookeeper.properties, have clientPort=2181. Then restart the Zookeeper, the Kafka server and the Spring boot app in the same order.

Update:

Newer versions of Kafka uses listeners=PLAINTEXT://localhost:9092 instead of port=9092 in the server.properties file. So try replacing that.