I have two streams. They are both aggregated data over 1 hour windows. I want to zip these streams so aggregations over the same timespan are tupled together, possibly with an empty value if now such correstponding match exists.
DataStream<OneHourAggA> one =
sourceA
.keyBy(d -> (String) d.values.get("m"))
.timeWindow(Time.hours(1))
.apply(new WorkWindwFolder());
DataSteam<OneHourAggB> other =
sourceB
.keyBy(d -> (String) d.values.get("m"))
.timeWindow(Time.hours(1))
.apply(new WorkWindwFolder());
DataStream<Tuple2<Option<OneHourAggA>,Option<OneHourAggB>> zipped =
sourceA.???(sourceB)
How can i achieve this?