I had a similar problem, basically, I had all my Kafka's messages on Protobuf and I solve that with UDF.
from pyspark.sql.functions import udf
def deserialization_function(message):
#You need to add your code to deserialize your messages
#I returned a json but you can return other structure
json = {"x": x_deserializable,
"y": y_deserializable,
"w": w_deserializable,
"z": z_deserializable,
return json
schema = StructType() \
.add("x", TimestampType()) \
.add("y", StringType()) \
.add("z", StringType()) \
.add("w", StringType())
own_udf = udf(deserialization_function, schema)
stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", topic) \
.load()
query = stream \
.select(col("value")) \
.select((own_udf("value")).alias("value_udf")) \
.select("value_udf.x", "value_udf.y", "value_udf.w", "value_udf.z")