I tried to implement a AuthenticateCallbackHandler ready to be released in Kafka 2.0.0 to no avail - is this a setup in which it should work?
On https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers I read:
Use an external authentication server for SASL/PLAIN authentication using the SaslServer implementation for PLAIN included in Kafka
Define a new class that implements AuthenticateCallbackHandler which handles NameCallback and PlainAuthenticateCallback and add the class to the broker's sasl.server.callback.handler.class property. A single instance of this callback handler will be created for the broker. The configured callback handler is responsible for validating the password provided by clients and this may use an external authentication server.
So basically what I did was create a class:
package com.example;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;
public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler {
private List<AppConfigurationEntry> jaasConfigEntries;
public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
this.jaasConfigEntries = jaasConfigEntries;
}
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
String username = null;
for (Callback callback: callbacks) {
if (callback instanceof NameCallback)
username = ((NameCallback) callback).getDefaultName();
else if (callback instanceof PlainAuthenticateCallback) {
PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
boolean authenticated = authenticate(username, plainCallback.password());
plainCallback.authenticated(authenticated);
} else
throw new UnsupportedCallbackException(callback);
}
}
protected boolean authenticate(String username, char[] password) throws IOException {
return username != null && username.equals("test") && new String(password).equals("test");
}
public void close() throws KafkaException {
}
}
Build a jar and make it available for Kafka like in this docker-compose.yml:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.0.0-beta1-1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SASL_ENABLED: "false"
kafka:
image: confluentinc/cp-kafka:5.0.0-beta1-1
depends_on:
- zookeeper
volumes:
- ./security:/etc/kafka/secrets
- ./jars:/etc/kafka/jars
ports:
- "9092:9092"
environment:
CLASSPATH: /etc/kafka/jars/*
ZOOKEEPER_SASL_ENABLED: "false"
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
KAFKA_SUPER_USERS: User:admin
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf
KAFKA_SASL_SERVER_CALLBACK_HANDLER_CLASS: com.example.CustomAuthenticateCallbackHandler
broker_jaas.conf:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
;
};
cli-client.properties:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="test" \
password="test";
Then I test it using:
> kafka-console-producer --broker-list kafka:9092 --topic test-topic --producer.config /etc/kafka/secrets/cli-client.properties
This is a message
This is another message
However I get an error that producer is unable to authenticate:
[2018-05-17 19:49:06,955] ERROR [Producer clientId=console-producer] Connection to node -1 failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)