1
votes

I am new to Kafka.I am testing Kafka with two instance of zoo keeper and two instance of broker.I created a test topic "topicA". Following is description of my topic.

Topic:topicA    PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: topicA   Partition: 0    Leader: 2       Replicas: 2     Isr: 2

Topic has one partitoin in Kafka broker -2 and only one replica in same broker. I am using Kafka producer(org.apache.kafka.kafka-clients.0.9.0.1) to send messages to broker.

Producer Config:

props.put("bootstrap.servers", "***:12900"); // this is kafka broker url
props.put("block.on.buffer.full", "true");
props.put("request.required.acks", "1");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("partition.assignment.strategy", "range");

I am sending 10k messages from producer.

kafkaProducer.send(new ProducerRecord<String, String>(
                    topic,"partitionName",
                    String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));

System.out.println("Sent Message -  " + i + " Successfully");

But I am not able to get any message in my consumer.

while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.offset() + "---->" + record.value());
                }
            }

my consmer prop:

bootstrap.servers = *:12900 // this is my kafka broker
group.id = test
key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
enable.auto.commit=true
# fast session timeout makes it more fun to play with failover
session.timeout.ms=10000

# These buffer sizes seem to be needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way.  No idea why this happens.
fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152

Error in broker 1 : BufferUnderFlowException is repeated so many times.

[Controller-1-to-broker-1-send-thread], Controller 1 epoch 6 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:1;ControllerEpoch:6;CorrelationId:10;ClientId:id_1-host_null-port_12900;Leaders:id:1,host:*,port:12900,id:2,host:*,port:12900;PartitionState:(__consumer_offsets,32) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,16) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,49) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,44) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,28) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,17) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,23) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,7) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,4) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,29) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,35) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,3) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,24) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,41) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,0) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,38) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,13) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,8) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,5) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,39) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,36) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,40) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,45) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,15) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,33) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,37) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,21) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,6) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,11) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,20) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,47) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,2) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,27) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,34) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,9) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,22) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,42) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,14) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,25) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,10) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,48) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,31) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,18) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,19) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,12) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,46) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,43) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,1) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,26) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,30) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2) to broker id:1,host:*,port:12900. Reconnecting to broker.
java.io.IOException: Broken pipe
kafka-request-handler-0]: [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [__consumer_offsets,32],[__consumer_offsets,16],[__consumer_offsets,44],[__consumer_offsets,28],[__consumer_offsets,4],[__consumer_offsets,24],[__consumer_offsets,0],[__consumer_offsets,38],[__consumer_offsets,8],[__consumer_offsets,36],[__consumer_offsets,40],[__consumer_offsets,6],[__consumer_offsets,20],[__consumer_offsets,2],[__consumer_offsets,34],[__consumer_offsets,22],[__consumer_offsets,42],[__consumer_offsets,14],[__consumer_offsets,10],[__consumer_offsets,48],[__consumer_offsets,18],[__consumer_offsets,12],[__consumer_offsets,46],[__consumer_offsets,26],[__consumer_offsets,30]
2016-07-31 06:48:11,045 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-0 with log end offset 0
2016-07-31 06:48:11,054 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,0] in log/kafka_1 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}.
2016-07-31 06:48:11,058 [WARN ][kafka-request-handler-0]: Partition [__consumer_offsets,0] on broker 1: No checkpointed highwatermark is found for partition [__consumer_offsets,0]
2016-07-31 06:48:11,069 [INFO ][kafka-scheduler-4]: Loading offsets from [__consumer_offsets,0]
2016-07-31 06:48:11,072 [INFO ][kafka-scheduler-4]: Finished loading offsets from [__consumer_offsets,0] in 3 milliseconds.
2016-07-31 06:59:31,945 [ERROR][kafka-network-thread-12900-2]: Closing socket for /host because of error
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
    at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
    at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
    at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
    at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
    at kafka.network.RequestChannel$Request.(RequestChannel.scala:50)
    at kafka.network.Processor.read(SocketServer.scala:450)
    at kafka.network.Processor.run(SocketServer.scala:340)

Log in broker 2 (There is no error in broker )

2016-07-31 06:48:10,972 [INFO ][kafka-request-handler-0]: [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [__consumer_offsets,49],[__consumer_offsets,17],[__consumer_offsets,23],[__consumer_offsets,7],[__consumer_offsets,29],[__consumer_offsets,35],[__consumer_offsets,3],[__consumer_offsets,41],[__consumer_offsets,13],[__consumer_offsets,5],[__consumer_offsets,39],[__consumer_offsets,45],[__consumer_offsets,15],[__consumer_offsets,33],[__consumer_offsets,37],[__consumer_offsets,21],[__consumer_offsets,11],[__consumer_offsets,47],[__consumer_offsets,27],[__consumer_offsets,9],[__consumer_offsets,25],[__consumer_offsets,31],[__consumer_offsets,19],[__consumer_offsets,43],[__consumer_offsets,1]
2016-07-31 06:48:10,990 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-29 with log end offset 0
2016-07-31 06:48:10,994 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,29] in log/kafka_2 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}.
2016-07-31 06:48:10,996 [WARN ][kafka-request-handler-0]: Partition [__consumer_offsets,29] on broker 2: No checkpointed highwatermark is found for partition [__consumer_offsets,29]
2016-07-31 06:48:10,998 [INFO ][kafka-scheduler-5]: Loading offsets from [__consumer_offsets,29]
2016-07-31 06:48:11,011 [INFO ][kafka-scheduler-5]: Finished loading offsets from [__consumer_offsets,29] in 13 milliseconds.
2016-07-31 06:48:11,023 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-45 with log end offset 0
2016-07-31 06:48:11,025 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,45] in log/kafka_2 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}.
2016-07-31 06:48:11,913 [INFO ][kafka-request-handler-0]: [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([

1) Please let me know why my consumer is unable to get messages ? 2) Is my producer and Consumer config looks ok ? Should my consumer/producer connect to zoo keeper instead of directly connecting to broker ? 3) What does epoch means in controller ? 4) What does following warning implies.. No checkpointed highwatermark is found for partition

1
Btw: Zookeeper requires an odd number of servers to work well!Matthias J. Sax

1 Answers

2
votes

You're obviously using older Kafka broker version than client version. Please double check your Kafka broker and consumer versions.

Your error says it cannot handle the JoinGroupRequest from the consumer. Most likely, this means your consumer is sending a version of a JoinGroupRequest that the broker is not able to understand.

In general, the version of your Kafka broker should be equal or higher the version of the client you use to avoid such errors.

The solution for you should be either upgrading your Kafka broker or downgrading the client you use.

Your configs look mostly good, except this one:

props.put("partition.assignment.strategy", "range");

It is useless, because this one relates to old consumer and most likely is ignored by your producer so you can safely remove it.

Epoch is something similar to version or generation ID of the cluster state. This allows for proper state synchronization of ordinary brokers with controllers.