I am getting following error in the logs when trying to publish first message to a new topic.
[WARN ] [o.a.kafka.clients.NetworkClient][[Producer clientId=producer-1] Error while fetching metadata with correlation id 766890 : {myTopic-1=INVALID_REPLICATION_FACTOR, myTopic-2=INVALID_REPLICATION_FACTOR}] []
The kafka fails hangs at:
"Hashed wheel timer #1" #521 prio=5 os_prio=0 tid=0x00007f932cd7d000 nid=0x199fa in Object.wait() [0x00007f9322b79000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:177)
- locked <0x000000047838b990> (a org.apache.kafka.clients.Metadata)
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:903)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:794)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784)
The kafka jar version we are using is: 0.10.0.1
The server.properties on broker is:
broker.id=3
host.name=<>
port=9092
message.max.bytes=20971520
num.partitions=30
auto.create.topics.enable=true
# Replication configurations
default.replication.factor=2
num.replica.fetchers=2
replica.fetch.max.bytes=20971520
log.dirs=/mnt1/data/kafka/kafka-logs-3
log.retention.hours=48
log.flush.interval.ms=10000
log.flush.interval.messages=20000
log.flush.scheduler.interval.ms=2000
log.cleanup.interval.mins=30
zookeeper.connect=<>
zookeeper.connection.timeout.ms=1000000
# Socket server configuration
num.io.threads=8
num.network.threads=8
socket.request.max.bytes=20971520
socket.receive.buffer.bytes=20971520
socket.send.buffer.bytes=20971520
queued.max.requests=32
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
inter.broker.protocol.version=0.10.2.0
log.message.format.version=0.10.0
delete.topic.enable=true
Why I am not able to send messages? The topic itself is not being created! The producer properties being used are:
Properties props = new Properties();
props.put("acks", "0");
props.put("retries", "0");
props.put("batch.size", "16384");
props.put("linger.ms", "100");
props.put("buffer.memory", "33554432");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("max.request.size", String.valueOf(ByteUnit.MB.toBytes(10)));
return props;