2
votes

I'm trying to setup a secure Kafka cluster and having a bit of difficulty with ACLs.

The Confluent security guide for Kafka Streams (https://docs.confluent.io/current/streams/developer-guide/security.html) simply states that the Cluster Create ACL has to be given to the principal... but it doesn't say anything about how to actually handle the internal topics.

Through research and experimentation, I've determined (for Kafka version 1.0.0):

  1. Wildcards cannot be used along with text for topic names in ACLs. For example, since all internal topics are prefixed with the application id, my first thought was to apply an acl to topics matching '<application.id>-*'. This doesn't work.
  2. Topics created by the Streams API do not get read/write access granted to the creator automatically.

Are the exact names of the internal topics predictable and consistent? In other words, if I run my application on a dev server, will the exact same topics be created on the production server when run? If so, then I can just add ACLs derived from dev before deploying. If not, how should the ACLs be added?

1

1 Answers

4
votes

Are the exact names of the internal topics predictable and consistent? In other words, if I run my application on a dev server, will the exact same topics be created on the production server when run?

Yes, you'll get the same exact topics names from run to run. The DSL generates processor names with a function that looks like this:

public String newProcessorName(final String prefix) {
    return prefix + String.format("%010d", index.getAndIncrement());
}

(where index is just an incrementing integer). Those processor names are then used to create repartition topics with a function that looks like this (the parameter name is a processor name generated as above):

static <K1, V1> String createReparitionedSource(final InternalStreamsBuilder builder,
                                                final Serde<K1> keySerde,
                                                final Serde<V1> valSerde,
                                                final String topicNamePrefix,
                                                final String name) {
    Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
    Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
    Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
    Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
    String baseName = topicNamePrefix != null ? topicNamePrefix : name;

    String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
    String sinkName = builder.newProcessorName(SINK_NAME);
    String filterName = builder.newProcessorName(FILTER_NAME);
    String sourceName = builder.newProcessorName(SOURCE_NAME);

    builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
    builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
        @Override
        public boolean test(final K1 key, final V1 value) {
            return key != null;
        }
    }, false), name);

    builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
        null, filterName);
    builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
        keyDeserializer, valDeserializer, repartitionTopic);

    return sourceName;
}

If you don't change your topology—like, if don't change the order of how it's built, etc—you'll get the same results no matter where the topology is constructed (presuming you're using the same version of Kafka Streams).

If so, then I can just add ACLs derived from dev before deploying. If not, how should the ACLs be added?

I have not used ACLs, but I imagine that since these are just regular topics, then yeah, you can apply ACLs to them. The security guide does mention:

When applications are run against a secured Kafka cluster, the principal running the application must have the ACL --cluster --operation Create set so that the application has the permissions to create internal topics.

I've been wondering about this myself, though, so if I am wrong I am guessing someone from Confluent will correct me.