6
votes

Java: OpenJdk 11 Kafka: 2.2.0 Kafka streams lib: 2.3.0

I am trying to deploy my Kafka streams application in a docker container and it fails while trying to create an internal state store with a TopicAuthorizationException. It works well locally. The main difference between locally and on the server is that there it connects to a server deployed Kafka and authenticates using the usual Kerberos auth. I fail to understand the link between authentication and the local stores.

My stream looks like that:

StreamsBuilder builder = new StreamsBuilder();

        //We stream from the source topic
        KStream<String, EnrichedMessagePayload> sourceMessagesStream = builder.stream(sourceTopic, Consumed
                .with(Serdes.serdeFrom(String.class), INPUT_SERDE));

        //We group per room and window
        TimeWindowedKStream<String, EnrichedMessagePayload> windowed = sourceMessagesStream
                .groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).grace(Duration.ZERO));

        //We make them a list
        KStream<Windowed<String>, WindowedMessages> grouped = windowed
                .aggregate(WindowedMessages::new,
                        (key, value, aggregate) -> aggregate.add(value),
                        Materialized.with(Serdes.String(), Serdes.serdeFrom(windowSerializer, windowSerializer)))
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream();

        //Filter
        KStream<Windowed<String>, FilterResult> filtered = grouped
                .mapValues((readOnlyKey, value) -> filterWindow(value.getMessages()));

        //Re map to its original form
        KStream<String, OutputPayload> reduced = filtered
                .flatMap((KeyValueMapper<Windowed<String>, WindowedMessages, Iterable<KeyValue<String, OutputPayload>>>) (key, value) -> value
                        .getMessages()
                        .stream().map(payload -> new KeyValue<>(key.key(), payload))
                        .collect(toList()));


        //Target topic
        reduced.to(sinkTopic, Produced
                .with(Serdes.serdeFrom(String.class), SERDE));

        return builder.build();

It receives a stream of messages, windows it, aggregates all the messages per window, keeps only the last version of the list with a 'Suppressed' and then flatMaps the whole to forward it to another topic.

Every time i get that kind of exception:

Error message was: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.] 2019-10-09 06:44:03.255 +0000 ERROR [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] [StreamThread.java:777] - stream-thread [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: - [rapid_r-live-message-filterer-0-0-1-snapshot-10.1e842f1a-ea60-11e9-9c7d-024298932744] - [] - [] org.apache.kafka.streams.errors.StreamsException: Could not create topic filterer-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog. at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:212) at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:226) at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:104) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:971) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:618) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:424) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:622) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:107) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]

1

1 Answers

4
votes

It is not "authentication" but "authorization". Look at your log messages, it says "Not authorized to access topics". As far as I can see, you are not authorized to create the internal topic 'filterer-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog' that backs your local suppress state store. State stores included in Kafka Streams are backed by default by a topic on the Kafka brokers. This internal topics are used during failover to restore local state stores. These internal topics are created automatically by the Kafka Streams application, thus the application needs to have appropriate permissions to create them.

See https://kafka.apache.org/23/documentation/streams/developer-guide/security.html#id1 for more information. There it says "the principal running the application must have the ACL set so that the application has the permissions to create, read and write internal topics."