I have a Spark application that reads data from a Kafka topic. My code is -
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.load()
df.printSchema()
This yields
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
I need to
- query for both the
key
andvalue
- deserialize the binary key to a string
- deserialize the binary value to an object (I have a deserializer for this)
- repartition the input tuples/DataSet based on the key string from step 2
Would the code below be the correct way to do it?
val dataset = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.load()
.select($"key", $"value")
.selectExpr("CAST(key as STRING) AS partitionId", "value AS value")
.repartition(col("partitionId"))
.mapPartitions{row => createOutputMessages()}
It does run fine, but I have no way to know if the partitioning was done correctly.