1
votes

I have a Spark application that reads data from a Kafka topic. My code is -

val df = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "server1")
 .option("subscribe", "topic1")
 .load()
df.printSchema()

This yields

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)

I need to

  1. query for both the key and value
  2. deserialize the binary key to a string
  3. deserialize the binary value to an object (I have a deserializer for this)
  4. repartition the input tuples/DataSet based on the key string from step 2

Would the code below be the correct way to do it?

val dataset = spark.readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "server1")
     .option("subscribe", "topic1")
     .load()
     .select($"key", $"value")
     .selectExpr("CAST(key as STRING) AS partitionId", "value AS value")
     .repartition(col("partitionId"))
     .mapPartitions{row => createOutputMessages()}

It does run fine, but I have no way to know if the partitioning was done correctly.

1

1 Answers

1
votes

You can get an idea about how your dataset is partitioned by using built-in function spark_partition_id(). So, something like this will show key distribution across Spark partitions:

dataset.groupBy(spark_partition_id,$"partitionId").count.show(false)