2
votes

I am new to spark. I use structured streaming to read data from kafka.

I can read the data using this code in Scala:

val data = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("subscribe", topics)
      .option("startingOffsets", startingOffsets) 
      .load()

My data in the value column are Thrift records. Streaming api gives the data in binary format. I see examples of casting the data to string or json but I am not able to find any examples of how to deserialize the data to Thrift.

How can I achieve this?

2

2 Answers

1
votes

Well, here is the followup solution. I can't post my own code, but here is the public code you can use, credit given to the owner/coder.

https://github.com/airbnb/airbnb-spark-thrift/blob/master/src/main/scala/com/airbnb/spark/thrift/

First of all, you need to convert the array[byte]/value to Row by calling convertObject function, let's call it makeRow

Second of all, you need to get your thrift class structType/schema by calling convert function, let's call the final result schema

Then you need to register an UDF like this val deserializer = udf((bytes: Array[Byte]) => makeRow(bytes), schema)

Note: You can not derictly use makeRow without passing the schema, otherwise Spark will complains: Schema for type org.apache.spark.sql.Row is not supported

Then you can use it in this way:

val stuff = kafkaStuff.withColumn("data", deserializer(kafkaStuff("value"))) val finalStuff = stuff.select("data.*")

And...you are done! Hope this helps.

And give another credit to this post Spark UDF for StructType / Row which gives me the final idea when my previous solution is so close.

0
votes

I found this blog on the databricks web site. It shows how Spark SQL’s APIs can be leveraged to consume and transform complex data streams from Apache Kafka.

https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

There is a section explaining how UDF can be used to Deserializer rows:

object MyDeserializerWrapper {
  val deser = new MyDeserializer
}
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) => 
  MyDeserializerWrapper.deser.deserialize(topic, bytes)
)

df.selectExpr("""deserialize("topic1", value) AS message""")

I am using java and therefore had to to write the following sample UDF, to check how it can be called in java:

UDF1<byte[], String> mode = new UDF1<byte[], String>() {
            @Override
            public String call(byte[] bytes) throws Exception {
                String s = new String(bytes);
                return "_" + s;
            }
        };

Now i can use this UDF in the structured streaming word count example, as follows:

Dataset<String> words = df
                //converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
//                .selectExpr("CAST(value AS STRING)")
                .select( callUDF("mode", col("value")) )
                .as(Encoders.STRING())
                .flatMap(
                        new FlatMapFunction<String, String>() {
                            @Override
                            public Iterator<String> call(String x) {
                                return Arrays.asList(x.split(" ")).iterator();
                            }
                        }, Encoders.STRING());

The next step for me is to write a UDF for the thrift deserialization. I will post it as soon as i am done.