From Flink's official doc:
The example code is:
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream
.keyBy(elem => /* select key */)
.intervalJoin(greenStream.keyBy(elem => /* select key */))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process(new ProcessJoinFunction[Integer, Integer, String] {
override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
out.collect(left + "," + right);
}
});
});
From the above code, I would like know how to specify the starting time(eg, from the beginning of today) from which to perform this interval join(the data before the starting time will not take into account).
eg, I have run the program for 3 days, I don't want to perform this join for all the data for 3 days, I just want to perform the join for the data generated today.