I join two streams to create a new stream after operate. code as follow:
DataStream<NewTableA> join1 =
oldTableADataStream
.keyBy(t -> t.getFa3())
.join(tableBDataStream)
.where(new oldTableAKeySelector())
.equalTo(new TableBKeySelector())
.window(EventTimeSessionWindows.withGap(Time.milliseconds(WIN_GAP_TIME)))
.allowedLateness(Time.milliseconds(allowedLateness))
.apply(new oldTableAJoinTableBFunc());
//.assignTimestampsAndWatermarks(new assignTSAndWMLastMax<>(maxOutOfOrderness));
join1.process(
new ProcessFunction<NewTableA, NewTableA>() {
@Override
public void processElement(NewTableA value, Context ctx, Collector<NewTableA> out)
throws Exception {
System.out.println(" NewTableA wmts:" + ctx.timerService().currentWatermark());
System.out.println(" NewTableA ts:" + ctx.timestamp() + " " + value);
}
});
the code of oldTableAJoinTableBFunc
as follow
public class oldTableAJoinTableBFunc implements JoinFunction<OldTableA, TableB, NewTableA> {
@Override
public NewTableA join(OldTableA oldTableA, TableB tableB) throws Exception {
//System.out.println("join1 on");
NewTableA newTableA = new NewTableA();
newTableA.setPA1(oldTableA.getPa1());
newTableA.setA2(oldTableA.getA2());
newTableA.setFA3(oldTableA.getFa3());
newTableA.setFA4(oldTableA.getFa4());
newTableA.setB2(tableB.getB2());
newTableA.setB3(tableB.getB3());
// importance!!!
newTableA.setTs(oldTableA.getTs());
return newTableA;
}
}
above example, oldTableADataStream
join tableBDataStream
to join1
stream on event time.
I found an interesting phenomenon. the timestamp of event in join1
is auto create by flink.
when I create the test data of oldTableADataStream
and tableBDataStream
, I intentionally set all the 1000000010 and 1000000044.but after join and apply function, the timestamp of event in new stream join1
is changed by flink, print as follow :
NewTableA wmts:**1000000042**
NewTableA ts:**1000000143** NewTableA{PA1=10, A2='a20', FA3=21, B2='b21', B3='b31', FA4=39, C2='null', FC3=null, D2='null', D3='null', ts=**1000000011**}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:**1000000143** NewTableA{PA1=1, A2='a20', FA3=20, B2='b20', B3='b30', FA4=30, C2='null', FC3=null, D2='null', D3='null', ts=**1000000010**}
NewTableA wmts:1000000042
NewTableA ts:1000000143 NewTableA{PA1=11, A2='a20', FA3=21, B2='b21', B3='b31', FA4=40, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000135 NewTableA{PA1=21, A2='a20', FA3=22, B2='b22', B3='b32', FA4=30, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000138 NewTableA{PA1=38, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000015}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:1000000138 NewTableA{PA1=57, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
NewTableA ts:1000000143 NewTableA{PA1=2, A2='a20', FA3=20, B2='b20', B3='b30', FA4=31, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA ts:1000000143 NewTableA{PA1=12, A2='a20', FA3=21, B2='b21', B3='b31', FA4=41, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:1000000135 NewTableA{PA1=22, A2='a20', FA3=22, B2='b22', B3='b32', FA4=31, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA ts:1000000138 NewTableA{PA1=58, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:1000000143 NewTableA{PA1=13, A2='a20', FA3=21, B2='b21', B3='b31', FA4=42, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000138 NewTableA{PA1=39, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000015}
NewTableA wmts:1000000042
NewTableA ts:1000000138 NewTableA{PA1=59, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
NewTableA ts:1000000135 NewTableA{PA1=23, A2='a20', FA3=22, B2='b22', B3='b32', FA4=32, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000143 NewTableA{PA1=3, A2='a20', FA3=20, B2='b20', B3='b30', FA4=32, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
NewTableA ts:1000000143 NewTableA{PA1=14, A2='a20', FA3=21, B2='b21', B3='b31', FA4=43, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA ts:1000000138 NewTableA{PA1=40, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000016}
NewTableA ts:1000000135 NewTableA{PA1=24, A2='a20', FA3=22, B2='b22', B3='b32', FA4=33, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
NewTableA wmts:1000000042
NewTableA wmts:1000000042
......
No rules seem to create the timestamp of a new event, How was the 1000000143 and 1000000138 and 1000000135 and so on be calculated? Doesn't seem to have anything to do with the watermark, because the timestamp watermark is 1000000042, it's different from the timestamp of event at the same time.
What rules the operation relies on to generate the new timestamp of events, I haven't found the official instructions, who can give a link to them?