0
votes

I join two streams to create a new stream after operate. code as follow:

DataStream<NewTableA> join1 =
    oldTableADataStream
        .keyBy(t -> t.getFa3())
        .join(tableBDataStream)
        .where(new oldTableAKeySelector())
        .equalTo(new TableBKeySelector())
        .window(EventTimeSessionWindows.withGap(Time.milliseconds(WIN_GAP_TIME)))
        .allowedLateness(Time.milliseconds(allowedLateness))
        .apply(new oldTableAJoinTableBFunc());
        //.assignTimestampsAndWatermarks(new assignTSAndWMLastMax<>(maxOutOfOrderness));


   join1.process(
    new ProcessFunction<NewTableA, NewTableA>() {
      @Override
      public void processElement(NewTableA value, Context ctx, Collector<NewTableA> out)
          throws Exception {
        System.out.println(" NewTableA wmts:" + ctx.timerService().currentWatermark());
        System.out.println(" NewTableA ts:" + ctx.timestamp() + " " + value);
      }
    });

the code of oldTableAJoinTableBFunc as follow

public class oldTableAJoinTableBFunc implements JoinFunction<OldTableA, TableB, NewTableA> {



  @Override
  public NewTableA join(OldTableA oldTableA, TableB tableB) throws Exception {

    //System.out.println("join1 on");

    NewTableA newTableA = new NewTableA();

    newTableA.setPA1(oldTableA.getPa1());
    newTableA.setA2(oldTableA.getA2());
    newTableA.setFA3(oldTableA.getFa3());
    newTableA.setFA4(oldTableA.getFa4());
    newTableA.setB2(tableB.getB2());
    newTableA.setB3(tableB.getB3());
    // importance!!!
    newTableA.setTs(oldTableA.getTs());

    return newTableA;
  }
}

above example, oldTableADataStream join tableBDataStream to join1 stream on event time.

I found an interesting phenomenon. the timestamp of event in join1 is auto create by flink.

when I create the test data of oldTableADataStream and tableBDataStream, I intentionally set all the 1000000010 and 1000000044.but after join and apply function, the timestamp of event in new stream join1 is changed by flink, print as follow :

NewTableA wmts:**1000000042**
 NewTableA ts:**1000000143** NewTableA{PA1=10, A2='a20', FA3=21, B2='b21', B3='b31', FA4=39, C2='null', FC3=null, D2='null', D3='null', ts=**1000000011**}
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 NewTableA ts:**1000000143** NewTableA{PA1=1, A2='a20', FA3=20, B2='b20', B3='b30', FA4=30, C2='null', FC3=null, D2='null', D3='null', ts=**1000000010**}
 NewTableA wmts:1000000042
 NewTableA ts:1000000143 NewTableA{PA1=11, A2='a20', FA3=21, B2='b21', B3='b31', FA4=40, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA ts:1000000135 NewTableA{PA1=21, A2='a20', FA3=22, B2='b22', B3='b32', FA4=30, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA ts:1000000138 NewTableA{PA1=38, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000015}
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 NewTableA ts:1000000138 NewTableA{PA1=57, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
 NewTableA ts:1000000143 NewTableA{PA1=2, A2='a20', FA3=20, B2='b20', B3='b30', FA4=31, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA wmts:1000000042
 NewTableA ts:1000000143 NewTableA{PA1=12, A2='a20', FA3=21, B2='b21', B3='b31', FA4=41, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 NewTableA ts:1000000135 NewTableA{PA1=22, A2='a20', FA3=22, B2='b22', B3='b32', FA4=31, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA wmts:1000000042
 NewTableA ts:1000000138 NewTableA{PA1=58, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 NewTableA ts:1000000143 NewTableA{PA1=13, A2='a20', FA3=21, B2='b21', B3='b31', FA4=42, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA ts:1000000138 NewTableA{PA1=39, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000015}
 NewTableA wmts:1000000042
 NewTableA ts:1000000138 NewTableA{PA1=59, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
 NewTableA ts:1000000135 NewTableA{PA1=23, A2='a20', FA3=22, B2='b22', B3='b32', FA4=32, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA ts:1000000143 NewTableA{PA1=3, A2='a20', FA3=20, B2='b20', B3='b30', FA4=32, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 NewTableA ts:1000000143 NewTableA{PA1=14, A2='a20', FA3=21, B2='b21', B3='b31', FA4=43, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA ts:1000000138 NewTableA{PA1=40, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000016}
 NewTableA ts:1000000135 NewTableA{PA1=24, A2='a20', FA3=22, B2='b22', B3='b32', FA4=33, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 ......

No rules seem to create the timestamp of a new event, How was the 1000000143 and 1000000138 and 1000000135 and so on be calculated? Doesn't seem to have anything to do with the watermark, because the timestamp watermark is 1000000042, it's different from the timestamp of event at the same time.

What rules the operation relies on to generate the new timestamp of events, I haven't found the official instructions, who can give a link to them?

1

1 Answers

1
votes

The output events that are created by time windows have their timestamps set to the maximum timestamp that falls within the window. So it's basically the session boundary of the session containing the event.

Since you have keyed session windows, based on FA3, you're getting values that depend on FA3: 1000000143 for 20 and 21, 1000000135 for 22, and 1000000138 for 23 and 24.