I am calculating the maximum value of a simple steam and the result is:
(S1,1000,S1, value: 999)
(S1,2000,S1, value: 41)
The last line of data is obviously late: new SensorReading("S1", 999, 100L)
why was it calculated by the first window(0-1000)?
I think that the first window should be fired when SensorReading("S1", 41, 1000L)
I am very confused about this result.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorReading> input = env.fromElements(
new SensorReading("S1", 35, 500L),
new SensorReading("S1", 42, 999L),
new SensorReading("S1", 41, 1000L),
new SensorReading("S1", 40, 1200L),
new SensorReading("S1", 23, 1400L),
new SensorReading("S1", 999, 100L)
input.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
private long currentMaxTimestamp;
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp);
public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
currentMaxTimestamp = element.ts;
return currentMaxTimestamp;
.keyBy((KeySelector<SensorReading, String>) value -> value.sensorName)
.reduce(new MyReducingMax(), new MyWindowFunction())
MyReducingMax(), MyWindowFunction()
private static class MyReducingMax implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.getValue() > r2.getValue() ? r1 : r2;
private static class MyWindowFunction extends
ProcessWindowFunction<SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {
public void process(
String key,
Context context,
Iterable<SensorReading> maxReading,
Collector<Tuple3<String, Long, SensorReading>> out) {
SensorReading max = maxReading.iterator().next();
out.collect(new Tuple3<>(key, context.window().getEnd(), max));
public static class SensorReading {
String sensorName;
int value;
Long ts;
public SensorReading() {
public SensorReading(String sensorName, int value, Long ts) {
this.sensorName = sensorName;
this.value = value;
this.ts = ts;
public Long getTs() {
return ts;
public void setTs(Long ts) {
this.ts = ts;
public String getSensorName() {
return sensorName;
public void setSensorName(String sensorName) {
this.sensorName = sensorName;
public int getValue() {
return value;
public void setValue(int value) {
this.value = value;
public String toString() {
return this.sensorName + "(" + this.ts + ") value: " + this.value;