3
votes

When authenticating with incorrect credentials, I'm getting the following expected message during authentication.

[Consumer clientId=consumer-1, groupId=xxx] Connection to node -1 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

That's all nice, but right now I can only find this information in the log file, while I would like to take action on this message.

I also looked at the Kafka source code where this message is originating from:

(In 1.1) https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java (or in trunk) https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java)

There is a bit of code like this:

switch (disconnectState.state()) {
            case AUTHENTICATION_FAILED:
                connectionStates.authenticationFailed(nodeId, now, disconnectState.exception());
                log.error("Connection to node {} failed authentication due to: {}", nodeId, disconnectState.exception().getMessage());
                break;
            case AUTHENTICATE:
                // This warning applies to older brokers which dont provide feedback on authentication failures
                log.warn("Connection to node {} terminated during authentication. This may indicate " +
                        "that authentication failed due to invalid credentials.", nodeId);
                break;

Important note: I override the authentication mechanism in Kafka with a custom module based on SASL_PLAIN and while I was just debugging I realized something else failed in this module during authentication, causing a different exception from this module, which meant it triggered this 'older broker' code path. But still, I want to take some action on this state.

I have tried creating a listener, like this:

    final ContainerProperties containerProperties = new ContainerProperties("TEST_TOPIC");
    containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    containerProperties.setMessageListener((AcknowledgingMessageListener<GenericRecord, GenericRecord>) (record, ack) -> logger.info("Got record on test topic"));
    KafkaMessageListenerContainer<GenericRecord, GenericRecord> container = new KafkaMessageListenerContainer<>(kafkaConsumerFactory, containerProperties);
    container.start();

But the code doesn't throw any authentication exception, it just keeps printing the above message in the log.

I have also tried creating a simple consumer, like this:

    Consumer<GenericRecord, GenericRecord> consumer = kafkaConsumerFactory.createConsumer();
    consumer.subscribe(Collections.singleton("BX_TEST_TOPIC"));
    try {
        ConsumerRecords<GenericRecord, GenericRecord> poll = consumer.poll(10);
    }
    catch (Exception e) {
        logger.info("Exception during polling", e);
    }

But this code does not throw any exception either, it also just keeps printing the message in the log. Even though according to https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-long- the poll method possibly throws an AuthenticationException. This is probably because of the 'older broker' kind of message, which in the source code only shows a warning in the log.

So, how to actually catch any AuthenticationException or get the connection state in some meaningful way instead of parsing the log, if the server doesn't return an authentication exception but still fails to log in?

Note exactly the same problem occurs when authentication is not the problem, but something else fails. For instance, sometimes my Kafka cluster doesn't start correctly, in which case I get this message:

Connection to node -1 could not be established. Broker may not be available.

which I can also not check on. Instead the log just keeps filling up with this message as the client gets in some infinite loop trying to connect.

1

1 Answers

1
votes

You can reduce the looping by increasing the retry backoff.

You can check the status of the broker with something like...

spring.kafka.producer.properties.retry.backoff.ms=1000
spring.kafka.producer.properties.max.block.ms=10000
spring.kafka.bootstrap-servers=localhost:9096

and

@Bean
public ApplicationRunner runner(ProducerFactory<?, ?> producerFactory) {
    return args -> {
        try (Producer<?, ?> producer = producerFactory.createProducer()) {
            producer.partitionsFor("foo");
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    };
}

and

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 10000 ms.

I reduced the max.block.ms because it's 60 seconds by default.