7
votes

I'm developing a Kafka-Stream application, which will read the message from input Kafka topic and filter unwanted data and push to output Kafka topic.

Kafka Stream Configuration:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {

    Map<String, Object> streamsConfiguration = new HashMap<>();
    streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
    streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
    streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    streamsConfiguration.put(SASL_MECHANISM, "PLAIN");

    return new KafkaStreamsConfiguration(streamsConfiguration);
}

KStream Filter Logic:

@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {

    KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
    /** Printing the source message */
    stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " *****Message From Input Topic: " + key + ": " + value));
    KStream<String, String> filteredDocument = stream.filter((k, v) -> filterCondition.test(k, v));

    filteredDocument.to(kafkaConsumerProperties.getProducerTopic(), Produced.with(Serdes.String(), Serdes.String()));
    /** After filtering printing the same message */
    filteredDocument.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " #####Filtered Document: " + key + ": " + value));
    return stream;
}

While starting above spring based Kafka Stream application, i was getting below exception.

2019-05-27T07:58:36.018-0500 ERROR stream-thread [QC-NormalizedEventProcessor-v1.0.0-e9cb1bed-3d90-41f1-957a-4fc7efc12a02-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: QC-NormalizedEventProcessor-v1.0.0

Our Kafka Infra team given necessary permission to "group.id", using this same "group id" i can consume the message using other Kafka Consumer applications and I was using name as per my wish in "application.id". We are not adding/updating "application.id" in Kafka Access Control List.

Am really not sure we need to give any permission for "application.id" or am missing something in the Kafka Stream Configuration. Please advice.

Please Note: I have tried with using with "group.id" and without "group.id" in Kafka Stream Configuration, all the time i am getting same exception.

Thanks! Bharathiraja Shanmugam

2

2 Answers

2
votes

I am not at my desk but I think Streams sets the group.id to application.id.

1
votes

We need to set access for application.id as well. For more information please refer -> https://docs.confluent.io/current/streams/developer-guide/security.html

Required ACL setting for secure Kafka clusters
Kafka clusters can use ACLs to control access to resources (like the ability to create topics), and for such clusters each client, including Kafka Streams, is required to authenticate as a particular user in order to be authorized with appropriate access. In particular, when Streams applications are run against a secured Kafka cluster, the principal running the application must have the ACL set so that the application has the permissions to create internal topics.

Since all internal topics as well as the embedded consumer group name are prefixed with the application ID, it is recommended to use ACLs on prefixed resource pattern to configure control lists to allow client to manage all topics and consumer groups started with this prefix as --resource-pattern-type prefixed --topic --operation All (see KIP-277 and KIP-290 for details).

For example, given the following setup of your Streams application:

• Config application.id value is team1-streams-app1. • Authenticating with the Kafka cluster as a team1 user. • The application's coded topology reads from input topics input-topic1 and input-topic2. • The application's topology write to output topics output-topic1 and output-topic2.

Then the following commands would create the necessary ACLs in the Kafka cluster to allow your application to operate:

# Allow Streams to read the input topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation Read --topic input-topic1 --topic input-topic2

# Allow Streams to write to the output topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation Write --topic output-topic1 --topic output-topic2

# Allow Streams to manage its own internal topics and consumer groups:
bin/kafka-acls ... --add --allow-principal User:team1 --operation All --resource-pattern-type prefixed --topic team1-streams-app1 --group team1-streams-app1