I am new to Kafka.I am testing Kafka with two instance of zoo keeper and two instance of broker.I created a test topic "topicA". Following is description of my topic.
Topic:topicA PartitionCount:1 ReplicationFactor:1 Configs:
Topic: topicA Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic has one partitoin in Kafka broker -2 and only one replica in same broker. I am using Kafka producer(org.apache.kafka.kafka-clients.0.9.0.1) to send messages to broker.
Producer Config:
props.put("bootstrap.servers", "***:12900"); // this is kafka broker url
props.put("block.on.buffer.full", "true");
props.put("request.required.acks", "1");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("partition.assignment.strategy", "range");
I am sending 10k messages from producer.
kafkaProducer.send(new ProducerRecord<String, String>(
topic,"partitionName",
String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
System.out.println("Sent Message - " + i + " Successfully");
But I am not able to get any message in my consumer.
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + "---->" + record.value());
}
}
my consmer prop:
bootstrap.servers = *:12900 // this is my kafka broker
group.id = test
key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
enable.auto.commit=true
# fast session timeout makes it more fun to play with failover
session.timeout.ms=10000
# These buffer sizes seem to be needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way. No idea why this happens.
fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152
Error in broker 1 : BufferUnderFlowException is repeated so many times.
[Controller-1-to-broker-1-send-thread], Controller 1 epoch 6 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:1;ControllerEpoch:6;CorrelationId:10;ClientId:id_1-host_null-port_12900;Leaders:id:1,host:*,port:12900,id:2,host:*,port:12900;PartitionState:(__consumer_offsets,32) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,16) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,49) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,44) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,28) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,17) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,23) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,7) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,4) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,29) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,35) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,3) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,24) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,41) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,0) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,38) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,13) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,8) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,5) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,39) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,36) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,40) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,45) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,15) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,33) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,37) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,21) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,6) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,11) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,20) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,47) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,2) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,27) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,34) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,9) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,22) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,42) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,14) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,25) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,10) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,48) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,31) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,18) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,19) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,12) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,46) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,43) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,1) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,26) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,30) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2) to broker id:1,host:*,port:12900. Reconnecting to broker.
java.io.IOException: Broken pipe
kafka-request-handler-0]: [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [__consumer_offsets,32],[__consumer_offsets,16],[__consumer_offsets,44],[__consumer_offsets,28],[__consumer_offsets,4],[__consumer_offsets,24],[__consumer_offsets,0],[__consumer_offsets,38],[__consumer_offsets,8],[__consumer_offsets,36],[__consumer_offsets,40],[__consumer_offsets,6],[__consumer_offsets,20],[__consumer_offsets,2],[__consumer_offsets,34],[__consumer_offsets,22],[__consumer_offsets,42],[__consumer_offsets,14],[__consumer_offsets,10],[__consumer_offsets,48],[__consumer_offsets,18],[__consumer_offsets,12],[__consumer_offsets,46],[__consumer_offsets,26],[__consumer_offsets,30]
2016-07-31 06:48:11,045 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-0 with log end offset 0
2016-07-31 06:48:11,054 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,0] in log/kafka_1 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}.
2016-07-31 06:48:11,058 [WARN ][kafka-request-handler-0]: Partition [__consumer_offsets,0] on broker 1: No checkpointed highwatermark is found for partition [__consumer_offsets,0]
2016-07-31 06:48:11,069 [INFO ][kafka-scheduler-4]: Loading offsets from [__consumer_offsets,0]
2016-07-31 06:48:11,072 [INFO ][kafka-scheduler-4]: Finished loading offsets from [__consumer_offsets,0] in 3 milliseconds.
2016-07-31 06:59:31,945 [ERROR][kafka-network-thread-12900-2]: Closing socket for /host because of error
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)
Log in broker 2 (There is no error in broker )
2016-07-31 06:48:10,972 [INFO ][kafka-request-handler-0]: [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [__consumer_offsets,49],[__consumer_offsets,17],[__consumer_offsets,23],[__consumer_offsets,7],[__consumer_offsets,29],[__consumer_offsets,35],[__consumer_offsets,3],[__consumer_offsets,41],[__consumer_offsets,13],[__consumer_offsets,5],[__consumer_offsets,39],[__consumer_offsets,45],[__consumer_offsets,15],[__consumer_offsets,33],[__consumer_offsets,37],[__consumer_offsets,21],[__consumer_offsets,11],[__consumer_offsets,47],[__consumer_offsets,27],[__consumer_offsets,9],[__consumer_offsets,25],[__consumer_offsets,31],[__consumer_offsets,19],[__consumer_offsets,43],[__consumer_offsets,1]
2016-07-31 06:48:10,990 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-29 with log end offset 0
2016-07-31 06:48:10,994 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,29] in log/kafka_2 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}.
2016-07-31 06:48:10,996 [WARN ][kafka-request-handler-0]: Partition [__consumer_offsets,29] on broker 2: No checkpointed highwatermark is found for partition [__consumer_offsets,29]
2016-07-31 06:48:10,998 [INFO ][kafka-scheduler-5]: Loading offsets from [__consumer_offsets,29]
2016-07-31 06:48:11,011 [INFO ][kafka-scheduler-5]: Finished loading offsets from [__consumer_offsets,29] in 13 milliseconds.
2016-07-31 06:48:11,023 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-45 with log end offset 0
2016-07-31 06:48:11,025 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,45] in log/kafka_2 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}.
2016-07-31 06:48:11,913 [INFO ][kafka-request-handler-0]: [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([
1) Please let me know why my consumer is unable to get messages ? 2) Is my producer and Consumer config looks ok ? Should my consumer/producer connect to zoo keeper instead of directly connecting to broker ? 3) What does epoch means in controller ? 4) What does following warning implies.. No checkpointed highwatermark is found for partition