1
votes

I am running Kafka 0.10.0 on CDH 5.9, cluster is kerborized. What I am trying to do is to write messages from a remote machine to my Kafka broker. The cluster (where Kafka is installed) has internal as well as external IP addresses. The machines' hostnames within the cluster get resolved to the private IPs, the remote machine resolves the same hostnames to the public IP addreses. I opened the necessary port 9092 (I am using SASL_PLAINTEXT protocol) from remote machine to Kafka Broker, verified that using telnet.

First Step - in addition to the standard properties for the Kafka Broker, I configured the following:

listeners=SASL_PLAINTEXT://0.0.0.0:9092

advertised.listeners=SASL_PLAINTEXT://<hostname>:9092

I am able to start the console consumer with

kafka-console-consumer --new consumer --topic <topicname> --from-beginning --bootstrap-server <hostname>:9092 --consumer.config consumer.properties

I am able to use my custom producer from another machine within the cluster. Relevant excerpt of producer properties:

security.protocol=SASL_PLAINTEXT 

bootstrap.servers=<hostname>:9092

I am not able to use my custom producer from the remote machine:

Exception org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for <topicname>-<partition>

using the same producer properties. I am able to telnet the Kafka Broker from the machine and /etc/hosts includes hostnames and public IPs.

Second Step - I modified server.properties:

listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://<kafkaBrokerInternalIP>:9092
  • consumer & producer within the same cluster still run fine (bootstrap servers are now the internal IP with port 9092)
  • as expected remote producer fails (but that is obvious given that it is not aware of the internal IP addresses)

Third Step - where it gets hairy :(

listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://<kafkaBrokerPublicIP>:9092

starting my consumer with

kafka-console-consumer --new-consumer --topic <topicname> --from-beginning --bootstrap-server <hostname>:9092 --consumer.config consumer.properties

gives me a warning, but I don't think this is right...

WARN clients.NetworkClient: Error while fetching metadata with correlation id 1 : {<topicname>=LEADER_NOT_AVAILABLE}

starting my consumer with

kafka-console-consumer --new-consumer --topic <topicname> --from-beginning --bootstrap-server <KafkaBrokerPublicIP>:9092 --consumer.config consumer.properties

just hangs after those log messages:

INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
INFO utils.AppInfoParser: Kafka commitId : unknown

seems like it cannot find a coordinator as in the normal flow this would be the next log:

INFO internals.AbstractCoordinator: Discovered coordinator <hostname>:9092 (id: <someNumber> rack: null) for group console-consumer-<someNumber>.

starting the producer on a cluster node with bootstrap.servers=:9092 I observe the same as with the producer:

WARN NetworkClient:600 - Error while fetching metadata with correlation id 0 : {<topicname>=LEADER_NOT_AVAILABLE}

starting the producer on a cluster node with bootstrap.servers=:9092 I get

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

starting the producer on my remote machine with either bootstrap.servers=:9092 or bootstrap.servers=:9092 I get

NetworkClient:600 - Error while fetching metadata with correlation id 0 : {<topicname>=LEADER_NOT_AVAILABLE}

I have been struggling for the past three days to get this to work, however I am out of ideas :/ My understanding is that advertised.hostnames serves for exactly this purpose, however either I am doing something wrong, or there is something wrong in the machine setup.

Any hints are very much appreciated!

1

1 Answers

0
votes

I met this issue recently. In my case , I enabled Kafka ACL, and after disable it by comment this 2 configuration, the problem worked around.

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:kafka

And an thread may help you I think: https://gist.github.com/jorisdevrede/a7933a99251452bb1867

What mentioned in it at the end:

If you only use a SASL_PLAINTEXT listener on the Kafka Broker, you have to make sure that you have set the security.inter.broker.protocol=SASL_PLAINTEXT too, otherwise you will get a LEADER_NOT_AVAILABLE error in the client.