I am prototyping calculating aggregations in a Spark Structured Streaming (Spark 3.0) job and publishing the updates to Kafka. I need to calculate the max date and max percentage all time (no windowing) for each group. The code seems fine except for with Kafka tombstone records (deletes) in the source stream. The stream receives a Kafka record with a valid key and a null value, but the max aggregate continues to include the record in the calculation. What are the best options to have this recalculate without the deleted records when a delete is consumed from Kafka?
Example
Message produced:
<"user1|1", {"user": "user1", "pct":30, "timestamp":"2021-01-01 01:00:00"}>
<"user1|2", {"user": "user1", "pct":40, "timestamp":"2021-01-01 02:00:00"}>
<"user1|2", null>
Spark code snippet:
val usageStreamRaw = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", usageTopic).load()
val usageStream = usageStreamRaw
.select(col("key").cast(StringType).as("key"),
from_json(col("value").cast(StringType), valueSchema).as("json"))
.selectExpr("key", "json.*")
val usageAgg = usageStream.groupBy("user")
.agg(
max("timestamp").as("maxTime"),
max("pct").as("maxPct")
)
val sq = usageAgg.writeStream.outputMode("update").option("truncate","false").format("console").start()
sq.awaitTermination()
For user1 the result in column pct
is 40 but it should be 30 after deletion. Is there a good way to do this with Spark Structured Streaming?