8
votes

I'm using AWS MSK and I want to enable ACLs but I'm unable to create a topic when ACLs are turned-on. I'm using the command-line tools for all the operations. Here's a summary of what I'm doing:

  • Create a fresh cluster
  • Create a topic - this works fine
  • Turn on ACL for client1 on resource=CLUSTER and operation=ALL
  • Create topic using AdminClient (by providing the --bootstrap-server option) - this gives a timeout exception
  • Re-try creating the same topic - this gives an error saying topic already exists
  • List topics using AdminClient - this returns no topics
  • Create topic using Zookeeper connect - this works
  • List topics using Zookeeper connect - this returns all the topic I've created (even those that timed-out)

So the issue is that the topic is getting created on Zookeeper but the broker can't access it. Presumably due to some ACL rule that I'm missing.

Raw output of the commands that I've run:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1

Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted du
e to timeout.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
        at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
        at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
 (kafka.admin.TopicCommand$)

Running the same command again:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1

Error while executing topic command : org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
[2019-09-30 17:25:38,266] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic
'test3' already exists.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
        at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
        at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
 (kafka.admin.TopicCommand$)

List of topics via AdminClient:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties --list


List of topics via Zookeeper connect:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --zookeeper $ZK --command-config ~/client1.properties --list
test
test2
test3
test4
test5

Here are my ACL rules:

Current ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`:
        (principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=--operation=All, patternType=LITERAL)`:
        (principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)

What am I missing?

1

1 Answers

4
votes

I don't think this has anything to do with AWS MSK, and it is rather an issue with your Kafka secured cluster configuration. Both clients (subscribers/producers) and inter-broker actions require authorization in a secured cluster. You'd have the same issue in a non-managed Kafka cluster.

The recommendation is to set up a "superuser" user (I'd call them service accounts) on the servers and then give these "superuser" users ACLs that allow the inter-broker interactions you need for your cluster. The exact ACLs you need is going to vary depending on your use cases and security preferences.

In server.properties you'd add an entry like super.users=User:BrokerService, and is documented at https://docs.confluent.io/current/kafka/authorization.html#kafka-auth-superuser. The documentation suggests using Alice and Bob as superuser names, which seems confusing to me. Pick whatever user name makes sense for you.

Then you need to setup a similar ACL that uses a user name principal with the "superuser" user you created above e.g. principal=User:BrokerService. The ACL would give whatever permissions the brokers need. Your immediate use case is to ALLOW READ of all topics it sounds like. You'll probably need other ACLs for inter-broker communication as well, but I can't tell you what you need exactly without more information about what you want to do.

For example this command to setup the ACL.

kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:BrokerService --operation All --topic '*' --cluster

More options for setting up the ACLs and a description of your exact problem are documented here https://docs.confluent.io/current/kafka/authorization.html#acl-format

Again please research some more or edit your question if you are looking for an exact configuration to use here as there are security and use case implications on what ACLs you use.