3
votes

I was able to run Kafka structure streaming programming before. But suddenly all my structure streaming python programs are failing with an error. I took basic Kafka structure streaming programming from Spark website that is also failing with same error.

py4j.protocol.Py4JJavaError: An error occurred while calling o31.load. : java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:376) at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)

Spark submit I am using

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 C:\Users\ranjith.gangam\PycharmProjects\sparktest\Structured_streaming.py

This is a code I took from Spark github

spark = SparkSession\
      .builder\
      .appName("StructuredKafkaWordCount")\
      .getOrCreate()

# Create DataSet representing the stream of input lines from kafka
lines = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", bootstrapServers)\
    .option(subscribeType, topics)\
    .load()\
    .selectExpr("CAST(value AS STRING)")

words = lines.select(
    # explode turns each item in an array into a separate row
    explode(
        split(lines.value, ' ')
    ).alias('word')
)

# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the console
query = wordCounts\
    .writeStream\
    .outputMode('complete')\
    .format('console')\
    .start()

query.awaitTermination()
1

1 Answers

-1
votes

You are in the right way, but unfortunately Kafka 0.10 is not yet supported by PySpark yet. As you can see in the SPARK-16534.

The only support for pySpark is Kafka 0.8 until now. So, you can migrate to spark 0.8 or change your code to Scala.