I found this blog on the databricks web site. It shows how Spark SQL’s APIs can be leveraged to consume and transform complex data streams from Apache Kafka.
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
There is a section explaining how UDF can be used to Deserializer rows:
object MyDeserializerWrapper {
val deser = new MyDeserializer
}
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) =>
MyDeserializerWrapper.deser.deserialize(topic, bytes)
)
df.selectExpr("""deserialize("topic1", value) AS message""")
I am using java and therefore had to to write the following sample UDF, to check how it can be called in java:
UDF1<byte[], String> mode = new UDF1<byte[], String>() {
@Override
public String call(byte[] bytes) throws Exception {
String s = new String(bytes);
return "_" + s;
}
};
Now i can use this UDF in the structured streaming word count example, as follows:
Dataset<String> words = df
//converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
// .selectExpr("CAST(value AS STRING)")
.select( callUDF("mode", col("value")) )
.as(Encoders.STRING())
.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING());
The next step for me is to write a UDF for the thrift deserialization. I will post it as soon as i am done.