6
votes

I tried to implement a AuthenticateCallbackHandler ready to be released in Kafka 2.0.0 to no avail - is this a setup in which it should work?

On https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers I read:

Use an external authentication server for SASL/PLAIN authentication using the SaslServer implementation for PLAIN included in Kafka

Define a new class that implements AuthenticateCallbackHandler which handles NameCallback and PlainAuthenticateCallback and add the class to the broker's sasl.server.callback.handler.class property. A single instance of this callback handler will be created for the broker. The configured callback handler is responsible for validating the password provided by clients and this may use an external authentication server.

So basically what I did was create a class:

package com.example;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;

public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler {
    private List<AppConfigurationEntry> jaasConfigEntries;

    public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        this.jaasConfigEntries = jaasConfigEntries;
    }

    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        String username = null;
        for (Callback callback: callbacks) {
            if (callback instanceof NameCallback)
                username = ((NameCallback) callback).getDefaultName();
            else if (callback instanceof PlainAuthenticateCallback) {
                PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
                boolean authenticated = authenticate(username, plainCallback.password());
                plainCallback.authenticated(authenticated);
            } else
                throw new UnsupportedCallbackException(callback);
        }
    }

    protected boolean authenticate(String username, char[] password) throws IOException {
        return username != null && username.equals("test") && new String(password).equals("test");
    }

    public void close() throws KafkaException {
    }
}

Build a jar and make it available for Kafka like in this docker-compose.yml:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0-beta1-1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SASL_ENABLED: "false"

  kafka:
    image: confluentinc/cp-kafka:5.0.0-beta1-1
    depends_on:
      - zookeeper
    volumes:
      - ./security:/etc/kafka/secrets
      - ./jars:/etc/kafka/jars
    ports:
      - "9092:9092"
    environment:
      CLASSPATH: /etc/kafka/jars/*
      ZOOKEEPER_SASL_ENABLED: "false"
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
      KAFKA_SUPER_USERS: User:admin
      KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf
      KAFKA_SASL_SERVER_CALLBACK_HANDLER_CLASS: com.example.CustomAuthenticateCallbackHandler

broker_jaas.conf:

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  ;
};

cli-client.properties:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="test" \
  password="test";

Then I test it using:

 > kafka-console-producer --broker-list kafka:9092 --topic test-topic --producer.config /etc/kafka/secrets/cli-client.properties
 This is a message
 This is another message

However I get an error that producer is unable to authenticate:

[2018-05-17 19:49:06,955] ERROR [Producer clientId=console-producer] Connection to node -1 failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)

1

1 Answers

1
votes

I got the following answer from the Kafka mailing list, will try next week:

This feature will be part upcoming Kafka 2.0.0 release.

Doc PR is here : https://github.com/apache/kafka/pull/4890

configs here: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java#L57