1
votes

I need to perform aggregation on incoming data based on spark driver timestamp, without watermark. My data doesn't have any timestamp field.

The requirement is: compute an average of the data received every sec (it doesn't matter when they have been send)

for example I need an aggregation on the data received for every trigger, just like the previous RDD streaming API.

is there a way to do that ?

2

2 Answers

1
votes

You can create your own sink and do your operation on each addBatch() call:

class CustomSink extends Sink {
  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    data.groupBy().agg(sum("age") as "sumAge").foreach(v => println(s"RESULT=$v"))
  }
}

class CustomSinkProvider extends StreamSinkProvider with DataSourceRegister {
  def createSink(
                  sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode): Sink = {
    new PersonSink()
  }

  def shortName(): String = "person"
}

With outputMode set to Update and a trigger every X seconds

  val query = ds.writeStream
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .outputMode(OutputMode.Update())
    .format("exactlyonce.CustomSinkProvider")
0
votes

Does "Trigger by processing time" fit your requirements? "Trigger by processing time" triggers every interval(defined by code).

Example trigger codes are below.

https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala#L34