2
votes

I faced a strange issue with my Kafka producer. I use kafka-0.11 server/client version. I have one zookeper and one kafka broker node. Also, I created 'events' topic with 3 partitions:

Topic:events    PartitionCount:3        ReplicationFactor:1     Configs:
        Topic: events   Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: events   Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: events   Partition: 2    Leader: 0       Replicas: 0     Isr: 0

In my java code I create producer with the following properties:

Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(MAX_BLOCK_MS_CONFIG, 30000);
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(PARTITIONER_CLASS_CONFIG, KafkaCustomPartitioner.class);
this.producer = new KafkaProducer<>(props);

Also, I'have added a callback to Producer#send() method that adds failed message to the queue that is iterated by the other "re-sending" thread in a loop:

this.producer.send(producerRecord, new ProducerCallback(producerRecord.value(), topic));

private class ProducerCallback implements Callback {
  private final String message;
  private final String topic;

  public ProducerCallback(String message, String topic) {
    this.message = message;
    this.topic = topic;
  }

  @Override
  public void onCompletion(RecordMetadata metadata, Exception ex) {
    if (ex != null) {
        logger.error("Kafka producer error. Topic: " + topic +
                ".Message will be added into failed messages queue.", ex);
        failedMessagesQueue.enqueue(SerializationUtils.serialize(new FailedMessage(topic, message)));
    }
  }
}

private class ResenderThread extends Thread {
    private volatile boolean running = true;

    public void stopGracefully() {
        running = false;
    }

    @Override
    public void run() {
        while (running) {
            try {
                byte[] val = failedMessagesQueue.peek();
                if (val != null) {
                    FailedMessage failedMessage = SerializationUtils.deserialize(val);
                    ProducerRecord<String, String> record;
                    if (topic.equals(failedMessage.getTopic())) {
                        String messageKey = generateMessageKey(failedMessage.getMessage());
                        record = createProducerRecordWithKey(failedMessage.getMessage(), messageKey, failedMessage.getTopic());
                    } else {
                        record = new ProducerRecord<>(failedMessage.getTopic(), failedMessage.getMessage());
                    }
                    try {
                        this.producer.send(record).get();
                        failedMessagesQueue.dequeue();
                    } catch (Exception e) {
                        logger.debug("Kafka message resending attempt was failed. Topic " + failedMessage.getTopic() +
                                " Partition. " + record.partition() + ". " + e.getMessage());
                    }
                }

                Thread.sleep(200);
            } catch (Exception e) {
                logger.error("Error resending an event", e);
                break;
            }
        }
    }
}

Everything works fine until I decided to test Kafka broker kill/re-start scenario:

I've killed my Kafka broker node, and sent a 5 messages using my Kafka producer. The following message were logged by my producer app:

....the application works fine....
// kafka broker was killed
2017-11-10 09:20:44,594 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:44,646 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:44,700 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:44,759 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:44,802 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
// sent 5 message using producer. message were put to the failedMessagesQueue and "re-sender" thread started resending 
2017-11-10 09:20:44,905 ERROR [com.inq.kafka.KafkaETLService] - <Kafka producer error. Topic: events.Message will be added into failed messages queue.>
....
2017-11-10 09:20:45,070 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:45,129 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:45,170 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>
2017-11-10 09:20:45,217 WARN [org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>

// kafka broker was restarted, some strange errors were logged
2017-11-10 09:20:51,103 WARN [org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 29 : {events=INVALID_REPLICATION_FACTOR}>
2017-11-10 09:20:51,205 WARN [org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 31 : {events=INVALID_REPLICATION_FACTOR}>
2017-11-10 09:20:51,308 WARN [org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 32 : {events=INVALID_REPLICATION_FACTOR}>
2017-11-10 09:20:51,114 WARN [org.apache.kafka.clients.producer.internals.Sender] - <Received unknown topic or partition error in produce request on partition events-0. The topic/partition may not exist or the user may not have Describe access to it>
2017-11-10 09:20:51,114 ERROR [com.inq.kafka.KafkaETLService] - <Kafka message resending attempt was failed. Topic events. org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.>
2017-11-10 09:20:52,485 WARN [org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 33 : {events=INVALID_REPLICATION_FACTOR}>
// messages were succesfully re-sent and received by consumer..

How can I get rid of these logs (that logs every 100ms when Kafka broker is down):

[org.apache.kafka.clients.NetworkClient] - <Connection to node 0 could not be established. Broker may not be available.>

Why do I receive the following errors after Kafka broker startup(I didn't change any server props, and didn't alter the topic). It seems to me that these errors are result of some syncrhonization process between zookeeper and kafka during broker startup, because after some time procuder succesfully resent my messages. Am I wrong?:

[org.apache.kafka.clients.NetworkClient] - <Error while fetching metadata with correlation id 29 : {events=INVALID_REPLICATION_FACTOR}>
Received unknown topic or partition error in produce request on partition events-0. The topic/partition may not exist or the user may not have Describe access to it. 
1
Did you find the solution? Please answer the question if you did.Chandan
could be same problem as here stackoverflow.com/questions/43315013/…Michal

1 Answers

0
votes
 bin/kafka-console-consumer.sh --bootstrap-server tt01.my.tech:9092,tt02.my.tech:9092,tt03.my.tech:9092 --topic wallet-test-topic1 --from-beginning
new message from topic1
hello
hello world
123
hello again
123
what do i publish ?
[2020-02-09 16:57:21,142] WARN [Consumer clientId=consumer-1, groupId=console-consumer-93672] Connection to node 2 (tt02.my.tech/192.168.35.118:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-02-09 16:57:25,999] WARN [Consumer clientId=consumer-1, groupId=console-consumer-93672] Connection to node 2 (tt02.my.tech/192.168.35.118:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-02-09 16:57:58,902] WARN [Consumer clientId=consumer-1, groupId=console-consumer-93672] Connection to node 2 (tt02.my.tech/192.168.35.118:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-02-09 16:57:59,024] WARN [Consumer clientId=consumer-1, groupId=console-consumer-93672] Connection to node 3 (tt03.my.tech/192.168.35.126:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
^CProcessed a total of 7 messages

On the consumer side, if there is no message read after the poll , this warning is thrown.

Basically, a call to .poll has a reference to

handleTimedOutRequests(responses, updatedNow);

If there are no message read in this poll and there is a timeout, then processDisconnection will throw the warning.

private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
    List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
    for (String nodeId : nodeIds) {
        // close connection to the node
        this.selector.close(nodeId);
        log.debug("Disconnecting from node {} due to request timeout.", nodeId);
        processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
    }

    // we disconnected, so we should probably refresh our metadata
    if (!nodeIds.isEmpty())
        metadataUpdater.requestUpdate();
}

This exact case-match in processDisconnection throws this warning:

        case NOT_CONNECTED:
        log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());

In short, everything will work fine from producer-consumer perspective. And you should treat the message as any other WARN