1
votes

I would like to aggregate a stream of trades into windows of the same trade volume, which is the sum of the trade size of all the trades in the interval.

I was able to write a custom Trigger that partitions the data into windows. Here is the code:

    case class Trade(key: Int, millis: Long, time: LocalDateTime, price: Double, size: Int)

class VolumeTrigger(triggerVolume: Int, config: ExecutionConfig) extends Trigger[Trade, Window] {
  val LOG: Logger = LoggerFactory.getLogger(classOf[VolumeTrigger])
  val stateDesc = new ValueStateDescriptor[Double]("volume", createTypeInformation[Double].createSerializer(config))

  override def onElement(event: Trade, timestamp: Long, window: Window, ctx: TriggerContext): TriggerResult = {
    val volume = ctx.getPartitionedState(stateDesc)
    if (volume.value == null) {
      volume.update(event.size)
      return TriggerResult.CONTINUE
    }

    volume.update(volume.value + event.size)
    if (volume.value < triggerVolume) {
      TriggerResult.CONTINUE
    }
    else {
      volume.update(volume.value - triggerVolume)
      TriggerResult.FIRE_AND_PURGE
    }
  }

  override def onEventTime(time: Long, window: Window, ctx: TriggerContext): TriggerResult = {
    TriggerResult.FIRE_AND_PURGE
  }

  override def onProcessingTime(time: Long, window:Window, ctx: TriggerContext): TriggerResult = {
    throw new UnsupportedOperationException("Not a processing time trigger")
  }

  override def clear(window: Window, ctx: TriggerContext): Unit = {
    val volume = ctx.getPartitionedState(stateDesc)
    ctx.getPartitionedState(stateDesc).clear()
  }
}

def main(args: Array[String]) : Unit = {

  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.setParallelism(1)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val trades = env
    .readTextFile("/tmp/trades.csv")
    .map {line =>
      val cells = line.split(",")
      val time = LocalDateTime.parse(cells(0), DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss.SSSSSSSSS"))
      val millis = time.toInstant(ZoneOffset.UTC).toEpochMilli
      Trade(0, millis, time, cells(1).toDouble, cells(2).toInt)
    }

  val aggregated = trades
    .assignAscendingTimestamps(_.millis)
    .keyBy("key")
    .window(GlobalWindows.create)
    .trigger(new VolumeTrigger(500, env.getConfig))
    .sum(4)

  aggregated.writeAsText("/tmp/trades_agg.csv")

  env.execute("volume agg")
}

The data looks for example as follows:

0180102 04:00:29.715706404,169.10,100
20180102 04:00:29.715715627,169.10,100
20180102 05:08:29.025299624,169.12,100
20180102 05:08:29.025906589,169.10,214
20180102 05:08:29.327113252,169.10,200
20180102 05:09:08.350939314,169.00,100
20180102 05:09:11.532817015,169.00,474
20180102 06:06:55.373584329,169.34,200
20180102 06:07:06.993081961,169.34,100
20180102 06:07:08.153291898,169.34,100
20180102 06:07:20.081524768,169.34,364
20180102 06:07:22.838656715,169.34,200
20180102 06:07:24.561360031,169.34,100
20180102 06:07:37.774385969,169.34,100
20180102 06:07:39.305219107,169.34,200

I have a time stamp, a price and a size.

The above code can partition it into windows of roughly the same size:

Trade(0,1514865629715,2018-01-02T04:00:29.715706404,169.1,514)
Trade(0,1514869709327,2018-01-02T05:08:29.327113252,169.1,774)
Trade(0,1514873215373,2018-01-02T06:06:55.373584329,169.34,300)
Trade(0,1514873228153,2018-01-02T06:07:08.153291898,169.34,464)
Trade(0,1514873242838,2018-01-02T06:07:22.838656715,169.34,600)
Trade(0,1514873294898,2018-01-02T06:08:14.898397117,169.34,500)
Trade(0,1514873299492,2018-01-02T06:08:19.492589659,169.34,400)
Trade(0,1514873332251,2018-01-02T06:08:52.251339070,169.34,500)
Trade(0,1514873337928,2018-01-02T06:08:57.928680090,169.34,1000)
Trade(0,1514873338078,2018-01-02T06:08:58.078221995,169.34,1000)

Now I like to partition the data so that the volume is exactly matching the trigger value. For this I would need to change the data slightly by splitting a trade at the end of an interval into two parts, one that belongs to the actual window being fired, and the remaining volume that is above the trigger value, has to be assigned to the next window.

Can that be handled with some custom aggregation function? It would need to know the results from the previous window(s) though and I was not able to find out how to do that.

Any ideas from Apache Flink experts how to handle that case?

Adding an evictor does not work as it only purges some elements at the beginning.

I hope the change from Spark Structured Streaming to Flink was a good choice, as I have later even more complicated situations to handle.

2
This is an intriguing question that I definitely think is doable, but you may end up needing to use a ProcessFunction and perhaps emit some side output (though not sure). Going to think on it! - Joshua DeWald
I found a working example using Flink stream connect and a custom CoProcessFunction. The implementation is a bit tricky as I have to handle all kinds of different cases, at least it is working. - Daniel

2 Answers

1
votes

Since your key is the same for all records, you may not require a window in this case. Please refer to this page in Flink's documentation https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-keyed-state. It has a CountWindowAverage class where in the aggregation of a value from each record in a stream is done using a State Variable. You can implement this and send the output whenever the state variable reaches your trigger volume and reset the value of the state variable with the remaining volume.

-1
votes

A simple approach (though not super-efficient) would be to put a FlatMapFunction ahead of your windowing flow. If it's keyed the same way, then you can use ValueState to track total volume, and emit two records (the split) when it hits your limit.