I am using the Flink SQL API in Flink 1.8. I have two stream tables Table1 and Table2.
If we define receivedTime
as the time where the data was received in a Table, I want to join Table1 and Table2 (on some id
) and keep only the rows where Table1.receivedTime > Table2.receivedTime
.
First, I tried to do this using Flink SQL CURRENT_TIMESTAMP
:
NEW_TABLE1 : SELECT *, CURRENT_TIMESTAMP as receivedTime FROM TABLE1
NEW_TABLE2 : SELECT *, CURRENT_TIMESTAMP as receivedTime FROM TABLE2
RESULT : SELECT * FROM NEW_TABLE1 JOIN NEW_TABLE2
WHERE NEW_TABLE1.id = NEW_TABLE2.id
AND NEW_TABLE1.receivedTime > NEW_TABLE2.receivedTime
But it look like the CURRENT_TIMESTAMP
always return the timestamp of when the query was evaluated. (It looks like the CURRENT_TIMESTAMP is replaced with the current date at this time and is not a dynamic value). I find this behavior weird, is it normal ?
The second solution I tried is to use Flink's processing time :
NEW_TABLE1 : SELECT *, proctime as receivedTime FROM TABLE1
NEW_TABLE2 : SELECT *, proctime as receivedTime FROM TABLE2
RESULT : SELECT * FROM NEW_TABLE1 JOIN NEW_TABLE2
WHERE NEW_TABLE1.id = NEW_TABLE2.id
AND NEW_TABLE1.receivedTime > NEW_TABLE2.receivedTime
But in this case, it look like the processing time is evaluated at the time the query is executed. And then, in my JOIN query, the two processing times are always equals.
What is the correct way to do what I want ?