8
votes

I have a single node, multi (3) broker Zookeeper / Kafka setup. I am using the Kafka 0.10 Java client.

I wrote following simple remote (on a different Server than Kafka) Producer (in the code I replaced my public IP address with MYIP):

Properties config = new Properties();
try {
    config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, MYIP:9094");
    config.put(ProducerConfig.ACKS_CONFIG, "all");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
    producer = new KafkaProducer<String, byte[]>(config);
    Schema.Parser parser = new Schema.Parser();
    schema = parser.parse(GATEWAY_SCHEMA);
    recordInjection = GenericAvroCodecs.toBinary(schema);
    GenericData.Record avroRecord = new GenericData.Record(schema);
    //Filling in avroRecord (code not here)
    byte[] bytes = recordInjection.apply(avroRecord);

    Future<RecordMetadata> future = producer.send(new ProducerRecord<String, byte[]>(datasetId+"", "testKey", bytes));
    RecordMetadata data = future.get();
} catch (Exception e) {
    e.printStackTrace();
}

My server properties for the 3 brokers look like this (in the 3 different server properties files broker.id is 0, 1, 2 and listeners is PLAINTEXT://:9092, PLAINTEXT://:9093, PLAINTEXT://:9094 and host.name is 10.2.0.4, 10.2.0.5, 10.2.0.6). This is the first server properties file:

broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka1-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

When I execute the code, I get following exception:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
    at com.nr.roles.gateway.GatewayManager.addTransaction(GatewayManager.java:212)
    at com.nr.roles.gateway.gw.service(gw.java:126)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:821)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1158)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1090)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:109)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:119)
    at org.eclipse.jetty.server.Server.handle(Server.java:517)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:308)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:242)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:261)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)
    at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:75)
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213)
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
    at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0

Does anyone know what I am missing? Any help would be appreciated. Thanks a lot

3
I also tried to do the same as above, but with only one broker (on port 9092). I still get the exact same Exception. I made sure the broker and zookeeper ports on remote machine are open and I can telnet them from the Producer machine.Armen

3 Answers

4
votes

I encounter the same problems.

You should change your kafka server.properties to specify ip address. eg:

PLAINTEXT://YOUIP:9093

if not, kafka will use hostname, if the producer can not get the host, it can not send message to kafka even if you can telnet them.

1
votes

Port information in your BOOTSTRAP_SERVERS_CONFIG configuration is incorrect (MYIP:9092).

As you've mentioned in server.properties as "PLAINTEXT://:9093, PLAINTEXT://:9093, PLAINTEXT://:9094".

0
votes

This answer shares some insight. You can increase the request.timeout.ms producer configuration which will allow the client to queue batches for longer before expiring.

You might also want to look into the batch.size and linger.ms configurations and find the optimal that works in your case.