I'm trying to join two types (say event A and B) of events in Flink. I want to confirm if my understanding is correct. Some of the properties of events -
- Event A flows immediately into flink with a few minutes delay (5-10 minutes)
- Event B flows at a slight delay of 15-30 minutes
- There is a 1:1 join between event A and event B
I have configured event A's datastream with BoundedOutOfOrdernessTimestampExtractor of 10 minutes, and event B's datastream with 30 minutes. I later do a time window join using the Table API.
Is my understanding correct about the following -
- Events are processed and joined as soon as they are received, as long as they are within the lateness window (10 minutes for event A, and 30 minutes for event B). The end to end latency has no minimum constraints due to any of Flink's configuration.
- The Table would hold onto the events for maximum of 30 minutes until the watermarks arrive from both streams. Later based on the watermarks the events are cleaned up
- The query configuration in the code below is redundant and not really required
Any other suggestions about the code below?
queryConfig.withIdleStateRetentionTime(
org.apache.flink.api.common.time.Time.seconds(1),
org.apache.flink.api.common.time.Time.minutes(30))
val stream: DataStream[Any] = textStream.flatMap(json => convert(json))
val aStream: DataStream[ClassA] =
stream
.filter(obj => obj.isInstanceOf[ClassA])
.rebalance
.map(obj => obj.asInstanceOf[ClassA])
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[ClassA](
Time.minutes(10)){
override def extractTimestamp(element: ClassA): Long =
element.serviceTimestamp.toInstant.toEpochMilli
})
val bStream: DataStream[ClassB] =
stream
.filter(obj => obj.isInstanceOf[ClassB])
.rebalance
.map(obj => obj.asInstanceOf[ClassB])
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[ClassB](
Time.minutes(30)){
override def extractTimestamp(element: ClassB): Long =
element.timestamp.toInstant.toEpochMilli
})
val aTable: Table = tableEnv.fromDataStream[ClassA](aStream,
// The .rowtime is for setting event time attributes
'aTimestamp.rowtime as 'aTimestamp, 'aUniqueId, 'aItem)
val bTable: Table = tableEnv.fromDataStream[ClassB](bStream,
// The .rowtime is for setting event time attributes
// https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
'bTimestamp.rowtime as 'bTimestamp, 'uniqueId, 'bItem)
val result: Table = aTable
.join(aTable)
.where('aUniqueId === 'uniqueId
// Give ClassB events 30 minutes lateness.
// Use a time window join as optimization - https://stackoverflow.com/a/51620821
// & https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#time-windowed-joins
// Both time clauses are need to qualify as time window join
&& 'bTimestamp >= 'aTimestamp
&& 'bTimestamp <= 'aTimestamp + 30.minutes)
// DO NOT change order without changing order in later parsing code
.select('uniqueId, 'aItem, 'bItem, 'bTimestamp, 'aTimestamp.cast(createTypeInformation[Timestamp]))
val outputStream: DataStream[ClassC] = tableEnv
.toAppendStream[(String, String, String, Timestamp, Timestamp)](result)
// TODO find better way to map to a POJO
.map(row => ClassCUtils.toClassC(row))