0
votes

I am using the Flink SQL API in Flink 1.8. I have two stream tables Table1 and Table2.

If we define receivedTime as the time where the data was received in a Table, I want to join Table1 and Table2 (on some id) and keep only the rows where Table1.receivedTime > Table2.receivedTime.

First, I tried to do this using Flink SQL CURRENT_TIMESTAMP :

NEW_TABLE1 : SELECT *, CURRENT_TIMESTAMP as receivedTime FROM TABLE1
NEW_TABLE2 : SELECT *, CURRENT_TIMESTAMP as receivedTime FROM TABLE2
RESULT     : SELECT * FROM NEW_TABLE1 JOIN NEW_TABLE2 
                    WHERE NEW_TABLE1.id = NEW_TABLE2.id 
                    AND NEW_TABLE1.receivedTime > NEW_TABLE2.receivedTime

But it look like the CURRENT_TIMESTAMP always return the timestamp of when the query was evaluated. (It looks like the CURRENT_TIMESTAMP is replaced with the current date at this time and is not a dynamic value). I find this behavior weird, is it normal ?

The second solution I tried is to use Flink's processing time :

NEW_TABLE1 : SELECT *, proctime as receivedTime FROM TABLE1
NEW_TABLE2 : SELECT *, proctime as receivedTime FROM TABLE2
RESULT     : SELECT * FROM NEW_TABLE1 JOIN NEW_TABLE2 
                    WHERE NEW_TABLE1.id = NEW_TABLE2.id 
                    AND NEW_TABLE1.receivedTime > NEW_TABLE2.receivedTime

But in this case, it look like the processing time is evaluated at the time the query is executed. And then, in my JOIN query, the two processing times are always equals.

What is the correct way to do what I want ?

1

1 Answers

1
votes

Flink and Flink SQL support two different notions of time: processing time is the time when an event is being processed (or in other words, the time when your query is being executed), while event time is based on timestamps recorded in the events. How this distinction is reflected in the Table and SQL APIs is described here in the documentation.

To get what you want, you'll first need to arrange for whatever process is creating the data in the two tables to include an event time timestamp in each record. Then you'll need to configure your tables so that Flink SQL is aware of which field in each table is to be used as the rowtime attribute, and you'll also need to specify how watermarking is to be done.

For example, if you are using the SQL client, then your schema might look something like this to indicate that the rideTime field should be used as event time timestamps along with a periodic bounded-out-of-orderness watermarking strategy using a delay of 60 seconds:

schema:
  - name: rowTime
    type: TIMESTAMP
    rowtime:
      timestamps:
        type: "from-field"
        from: "rideTime"
      watermarks:
        type: "periodic-bounded"
        delay: "60000"

If you're not using the SQL client, see the documentation for examples, whether using DataStream to Table conversion or TableSources.

Update:

What you'd really prefer, I gather, is to work with ingestion time, but Flink SQL doesn't support ingestion time. You'll have to configure the job to use TimeCharacteristic.EventTime, implement a timestamp extractor and watermark generator, and call assignTimestampsAndWatermarks.

If you don't want to bother with having a timestamp field in each event, your timestamp extractor can look like this:

AssignerWithPeriodicWatermarks<Event> assigner = new AscendingTimestampExtractor<Event> {
  @Override
  public long extractAscendingTimestamp(Event element) {
    return System.currentTimeMillis();
  }
};