1
votes

I am trying to read data from Kafka via spark structured streaming. However, in Spark 2.4.0., you cannot set group id for the stream (see How to set group.id for consumer group in kafka data source in Structured Streaming?).

However, as this is not set, spark simply generates the group Id and I am stuck at GroupAuthorizationException:

19/12/10 15:15:00 ERROR streaming.MicroBatchExecution: Query [id = 747090ff-120f-4a6d-b20e-634eb77ac7b8, runId = 63aa4cce-ad72-47f2-80f6-e87947b69685] terminated with error
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-d2420426-13d5-4bda-ad21-7d8e43ebf518-1874352823-driver-2

Any ideas how to bypass this please? Funny thing is, I am able to read this data via kafka-console-consumer.sh, where I am able to pass the group id in a .properties file.

Code throwing the exception:

val df = spark
  .readStream
  .format("kafka")
  .option("subscribe", "topic")
  .option("startingOffsets", "earliest")
  .option("kafka.group.id", "idThatShouldBeUsed")
  .option("kafka.bootstrap.servers", "server")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.ssl.truststore.location", "/location)
  .option("kafka.ssl.truststore.password", "pass")
  .option("kafka.sasl.jaas.config", """jaasToUse""")
  .load()
  .writeStream
  .outputMode("append")
  .format("console")
  .option("startingOffsets", "earliest")
  .start().awaitTermination()
1
Group id shouldn't determine authentication. The JKS files and JAAS properties should - OneCricketeer
well, it seems to - the same problem can be solved by using wildcards when granting rights to group (stackoverflow.com/questions/48545215/…). However, I am not allowed to change these Kafka settings. - Tomáš Sedloň
I was under the impression those are "user groups", not the consumer's "group id". The Authorizer is pluggable, by the way, but you must work with your Kafka admins to adjust these settings - OneCricketeer

1 Answers

3
votes

Seems that this is not solvable from the consumer's side. We ended up having to use bin/kafka-acls.sh and wildcards to allow all group ids generated by structured streaming.

kafka acl example:

bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zk:2181 --add --allow-principal User:'Bon' --operation READ --topic topicName --group='spark-kafka-source-' --resource-pattern-type prefixed