I would like to aggregate a stream of trades into windows of the same trade volume, which is the sum of the trade size of all the trades in the interval.
I was able to write a custom Trigger that partitions the data into windows. Here is the code:
case class Trade(key: Int, millis: Long, time: LocalDateTime, price: Double, size: Int)
class VolumeTrigger(triggerVolume: Int, config: ExecutionConfig) extends Trigger[Trade, Window] {
val LOG: Logger = LoggerFactory.getLogger(classOf[VolumeTrigger])
val stateDesc = new ValueStateDescriptor[Double]("volume", createTypeInformation[Double].createSerializer(config))
override def onElement(event: Trade, timestamp: Long, window: Window, ctx: TriggerContext): TriggerResult = {
val volume = ctx.getPartitionedState(stateDesc)
if (volume.value == null) {
volume.update(event.size)
return TriggerResult.CONTINUE
}
volume.update(volume.value + event.size)
if (volume.value < triggerVolume) {
TriggerResult.CONTINUE
}
else {
volume.update(volume.value - triggerVolume)
TriggerResult.FIRE_AND_PURGE
}
}
override def onEventTime(time: Long, window: Window, ctx: TriggerContext): TriggerResult = {
TriggerResult.FIRE_AND_PURGE
}
override def onProcessingTime(time: Long, window:Window, ctx: TriggerContext): TriggerResult = {
throw new UnsupportedOperationException("Not a processing time trigger")
}
override def clear(window: Window, ctx: TriggerContext): Unit = {
val volume = ctx.getPartitionedState(stateDesc)
ctx.getPartitionedState(stateDesc).clear()
}
}
def main(args: Array[String]) : Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val trades = env
.readTextFile("/tmp/trades.csv")
.map {line =>
val cells = line.split(",")
val time = LocalDateTime.parse(cells(0), DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss.SSSSSSSSS"))
val millis = time.toInstant(ZoneOffset.UTC).toEpochMilli
Trade(0, millis, time, cells(1).toDouble, cells(2).toInt)
}
val aggregated = trades
.assignAscendingTimestamps(_.millis)
.keyBy("key")
.window(GlobalWindows.create)
.trigger(new VolumeTrigger(500, env.getConfig))
.sum(4)
aggregated.writeAsText("/tmp/trades_agg.csv")
env.execute("volume agg")
}
The data looks for example as follows:
0180102 04:00:29.715706404,169.10,100
20180102 04:00:29.715715627,169.10,100
20180102 05:08:29.025299624,169.12,100
20180102 05:08:29.025906589,169.10,214
20180102 05:08:29.327113252,169.10,200
20180102 05:09:08.350939314,169.00,100
20180102 05:09:11.532817015,169.00,474
20180102 06:06:55.373584329,169.34,200
20180102 06:07:06.993081961,169.34,100
20180102 06:07:08.153291898,169.34,100
20180102 06:07:20.081524768,169.34,364
20180102 06:07:22.838656715,169.34,200
20180102 06:07:24.561360031,169.34,100
20180102 06:07:37.774385969,169.34,100
20180102 06:07:39.305219107,169.34,200
I have a time stamp, a price and a size.
The above code can partition it into windows of roughly the same size:
Trade(0,1514865629715,2018-01-02T04:00:29.715706404,169.1,514)
Trade(0,1514869709327,2018-01-02T05:08:29.327113252,169.1,774)
Trade(0,1514873215373,2018-01-02T06:06:55.373584329,169.34,300)
Trade(0,1514873228153,2018-01-02T06:07:08.153291898,169.34,464)
Trade(0,1514873242838,2018-01-02T06:07:22.838656715,169.34,600)
Trade(0,1514873294898,2018-01-02T06:08:14.898397117,169.34,500)
Trade(0,1514873299492,2018-01-02T06:08:19.492589659,169.34,400)
Trade(0,1514873332251,2018-01-02T06:08:52.251339070,169.34,500)
Trade(0,1514873337928,2018-01-02T06:08:57.928680090,169.34,1000)
Trade(0,1514873338078,2018-01-02T06:08:58.078221995,169.34,1000)
Now I like to partition the data so that the volume is exactly matching the trigger value. For this I would need to change the data slightly by splitting a trade at the end of an interval into two parts, one that belongs to the actual window being fired, and the remaining volume that is above the trigger value, has to be assigned to the next window.
Can that be handled with some custom aggregation function? It would need to know the results from the previous window(s) though and I was not able to find out how to do that.
Any ideas from Apache Flink experts how to handle that case?
Adding an evictor does not work as it only purges some elements at the beginning.
I hope the change from Spark Structured Streaming to Flink was a good choice, as I have later even more complicated situations to handle.