4
votes

I am writing a Spark structured streaming application in PySpark to read data from Kafka.

However, the current version of Spark is 2.1.0, which does not allow me to set group id as a parameter and will generate a unique id for each query. But the Kafka connection is group-based authorization which requires a pre-set group id.

Hence, is there any workaround to establish the connection without the need to update Spark to 2.2 since my team does not want it.

My Code:

if __name__ == "__main__":
    spark = SparkSession.builder.appName("DNS").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")

    # Subscribe to 1 topic
    lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
    print(lines.isStreaming) #print TRUE
    lines.selectExpr("CAST(value AS STRING)")
    # Split the lines into words
    words = lines.select(
    explode(
        split(lines.value, " ")
        ).alias("word")
    )
    # Generate running word count
    wordCounts = words.groupBy("word").count()

    # Start running the query that prints the running counts to the console
    query = wordCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()

    query.awaitTermination()
2
I don't think you can set group.id in Spark 2.2 as well - spark.apache.org/docs/latest/…himanshuIIITian
According to this Databricks doc Since Spark 2.2, you can optionally set the group id. However, use it with extreme caution as this may cause unexpected behavior.ELI
Strange! because according to Spark 2.2 documentation, we cannot. Might be there is a mismatch between the two documentations.himanshuIIITian
Yeah, but anyway, I am not planning to update SparkELI
I am not sure abiut the unique id for each query.thebluephantom

2 Answers

1
votes

KafkaUtils class will override the parameter value for "group.id". It will concat "spark-executor-" in from of the orginal group id.

Below is the code from KafkaUtils where is doing this:

// driver and executor should be in different consumer groups
    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
    if (null == originalGroupId) {
      logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
    }
    val groupId = "spark-executor-" + originalGroupId
    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)

We faced the same problem. Kafka was based on ACL with presets group id, so the only thing was to alter the group id in kafka configuration. Insead of our original group id we put "spark-executor-" + originalGroupId

1
votes

Setting group.id is now possible with Spark 3.x. See Structured Streaming + Kafka Integration Guide where it says:

kafka.group.id: The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to be very small. When this is set, option "groupIdPrefix" will be ignored.

However, this group.id is still not used to commit offsets back to Kafka and the offset management remains within Spark's checkpoint files. I have given more details (also for Spark < 3.x) in my answers: