3
votes

I am trying to aggregate and find some metrics using Spark streaming (Reading from Kafka) every minute. I am able to aggregate the data for that particular minute. How do I make sure I can have a bucket for current day and sum up all the aggregate values of all minutes in that day?

I have a data frame and I am doing something similar to this.

sampleDF = spark.sql("select userId,sum(likes) as total from likes_dataset group by userId order by userId")
2

2 Answers

1
votes

You can make the use of "Watermarking" feature from Structured Streaming Programming

Sample code

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

    val windowedCounts = words
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            window($"timestamp", "10 minutes", "5 minutes"),
            $"word")
        .count()
1
votes

I figured out what's going on. I got to know about stateful streaming in Spark and that helped me.

All I had to do is,

running_counts = countStream.updateStateByKey(updateTotalCount, initialRDD=initialStateRDD) 

where I had to write this updateTotalCount function to say how to merge the old aggregate data with new aggregate data of micro batch. In my case, the update function looks like this:

def updateTotalCount(currentCount, countState):
    if countState is None:
       countState = 0
    return sum(currentCount) + countState