0
votes

A good feature of spark structured streaming is that it can join the static dataframe with the streaming dataframe. To cite an example as below. users is a static dataframe read from database. transactionStream is from a stream. By the joining function, we can get the spending of each country accumulated with the new arrival of batches.

val spendingByCountry = (transactionStream
    .join(users, users("id") === transactionStream("userid"))
    .groupBy($"country")
    .agg(sum($"cost")) as "spending")

spendingByContry.writeStream
    .outputMode("complete")
    .format("console")
    .start()

The sum of cost is aggregated with the new batches are coming as shown below.

-------------------------------
Batch: 0
------------------------------- 
Country Spending
EN      90.0
FR      50.0

-------------------------------
Batch: 1
------------------------------- 
Country Spending
EN      190.0
FR      150.0

If I want to introduce a notification and reset logic as the above example, what should be the correct approach? The requirement is that if the spending is larger than some threshold, the records of country and spending should be stored into a table and the spending should be reset as 0 to accumulate again.

1

1 Answers

0
votes

One approach that you can achieve this is by arbitrary stateful processing. The groupBy can be enhanced with a custom function mapGroupsWithState where you maintain all the business logic needed. Here is an example taken from the Spark docs:

 // A mapping function that maintains an integer state for string keys and returns a string.  // Additionally, it sets a timeout to remove the state if it has not received data for an hour.  def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {

   if (state.hasTimedOut) {                // If called when timing out, remove the state
     state.remove()

   } else if (state.exists) {              // If state exists, use it for processing
     val existingState = state.get         // Get the existing state
     val shouldRemove = ...                // Decide whether to remove the state
     if (shouldRemove) {
       state.remove()                      // Remove the state

     } else {
       val newState = ...
       state.update(newState)              // Set the new state
       state.setTimeoutDuration("1 hour")  // Set the timeout
     }

   } else {
     val initialState = ...
     state.update(initialState)            // Set the initial state
     state.setTimeoutDuration("1 hour")    // Set the timeout    }    ...    // return something  }




  dataset
.groupByKey(...)    
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)