Properties props = new Properties();
String groupId = "consumer-tutorial-group";
List<String> topics = Arrays.asList("consumer-tutorial");
props.put("bootstrap.servers", "");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
} catch (Exception e) {
} finally {
i am trying to write run the above code,its a simple consumer code which try to read from a topic but i got a weird exception and i can't handle it.
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 1139567, only 45 bytes available
i quote you also my producer code
Properties props = new Properties();
props.put("bootstrap.servers", "");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("consumer-tutorial", Integer.toString(i), Integer.toString(i)));
Here is kafka configs
--Start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
--Start Kafka Server
bin/kafka-server-start.sh config/server.properties
-- Create a topic
bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper
--Kafka 0.10.0