1
votes

I have read a csv file and converted the value field into bytes and wrote to a Kafka topic using Kafka producer application. Now I am trying to read from Kafka topic using Structured Streaming but not able to apply the custom kryo deserialization on value field.

Can anyone tell me how to use the custom deserialization in Structured Streaming?

1
Please read Under what circumstances may I add “urgent” or other similar phrases to my question, in order to obtain faster answers? - the summary is that this is not an ideal way to address volunteers, and is probably counterproductive to obtaining answers. Please refrain from adding this to your questions.halfer

1 Answers

0
votes

I had a similar problem, basically, I had all my Kafka's messages on Protobuf and I solve that with UDF.

from pyspark.sql.functions import udf

def deserialization_function(message):
    #You need to add your code to deserialize your messages
    #I returned a json but you can return other structure
    json = {"x": x_deserializable,
            "y": y_deserializable,
            "w": w_deserializable,
            "z": z_deserializable,
    return json

schema = StructType() \
                    .add("x", TimestampType()) \
                    .add("y", StringType()) \
                    .add("z", StringType()) \
                    .add("w", StringType()) 

own_udf = udf(deserialization_function, schema)

stream = spark.readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
          .option("subscribe", topic) \
          .load()

query = stream \
        .select(col("value")) \
        .select((own_udf("value")).alias("value_udf")) \
        .select("value_udf.x", "value_udf.y", "value_udf.w", "value_udf.z")