I am writing a Spark structured streaming application in PySpark to read data from Kafka.
However, the current version of Spark is 2.1.0, which does not allow me to set group id as a parameter and will generate a unique id for each query. But the Kafka connection is group-based authorization which requires a pre-set group id.
Hence, is there any workaround to establish the connection without the need to update Spark to 2.2 since my team does not want it.
My Code:
if __name__ == "__main__":
spark = SparkSession.builder.appName("DNS").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# Subscribe to 1 topic
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
print(lines.isStreaming) #print TRUE
lines.selectExpr("CAST(value AS STRING)")
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
group.id
in Spark 2.2 as well - spark.apache.org/docs/latest/… – himanshuIIITian