8
votes

--Consumer

Properties props = new Properties();
        String groupId = "consumer-tutorial-group";
        List<String> topics = Arrays.asList("consumer-tutorial");
        props.put("bootstrap.servers", "192.168.1.75:9092");
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "true");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        try {
            consumer.subscribe(topics);
            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());


            }
        } catch (Exception e) {
            System.out.println(e.toString());
        } finally {
            consumer.close();
        }
    }

i am trying to write run the above code,its a simple consumer code which try to read from a topic but i got a weird exception and i can't handle it.

org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 1139567, only 45 bytes available

i quote you also my producer code

--Producer

Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.7:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("consumer-tutorial", Integer.toString(i), Integer.toString(i)));

        producer.close();

Here is kafka configs

--Start zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

--Start Kafka Server

bin/kafka-server-start.sh config/server.properties

-- Create a topic

bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper 192.168.1.75:2181

--Kafka 0.10.0

<dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka-clients</artifactId>
           <version>0.10.0.0</version>
   </dependency>
   <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka_2.11</artifactId>
           <version>0.10.0.0</version>
   </dependency>
3
what client version are you using ?Eaque
i am using Kafka 0.10.0user5680169
Oups, I forgot to ask you for the brokers ! Are using kafka 0.10 too ? I had the same error because kafka client 0.10 is not compatible with broker 0.9.Eaque
i downgrade to client 0.9 and the exception was removed but the code still not workuser5680169
Had a similar issue trying to use broker on 0.9 and client on 0.10. I upgraded my confluent and all was resolved.JB Lovell

3 Answers

9
votes

I've also got the same issue when using kafka_2.11 artifact with version 0.10.0.0. But this got resolved once I've changed the kafka server to 0.10.0.0. Earlier I was pointing to 0.9.0.1. It looks like server and your pom version should be in synch.

1
votes

i solved my problem with downgrade to kafka 0.9.0,but it still not an efficient solution for me. If someone knows an efficient way of how to fix this in kafka 0.10.0 version,feel free to post it. Until then this is my solution

<dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka-clients</artifactId>
           <version>0.9.0.0</version>
   </dependency>
   <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka_2.11</artifactId>
           <version>0.9.0.0</version>
   </dependency>
0
votes

I have the same issue.Client jar compatibility issue as I am using Kafka server 9.0.0 and Kafka client 10.0.0.Basically Kafka 0.10.0 introduced a new message format and not able to read the topic metadata from the older version.

<dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 <version>1.0.0.RELEASE</version> <!-- changed due lower version of the kafka server -->
</dependency>