I use Spark 2.1.
I am trying to read records from Kafka using Spark Structured Streaming, deserialize them and apply aggregations afterwards.
I have the following code:
SparkSession spark = SparkSession
.builder()
.appName("Statistics")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUri)
.option("subscribe", "Statistics")
.option("startingOffsets", "earliest")
.load();
df.selectExpr("CAST(value AS STRING)")
What I want is to deserialize the value
field into my object instead of casting as String
.
I have a custom deserializer for this.
public StatisticsRecord deserialize(String s, byte[] bytes)
How can I do this in Java?
The only relevant link I have found is this https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html, but this is for Scala.