2
votes

In our streaming application, that uses Flink 1.55 and its table API, I need to detect and handle late elements. I am unable to find an alternative to the functionality of DataStream API .sideOutputLateData(...)

I tried to search in Flink documentation https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/tableApi.html and google a lot and found nothing useful

Example:

table
  .window(Tumble over windowLengthInMinutes.minutes on 'timeStamp as 'timeWindow)
  .groupBy(..fieds list)
  .select(..fields)

Provided code works as expected. The problem is, that elements that arrive late- as defined by window size and allowed lateness, are discarded. Is there a way to handle these late elements natively by Table API?

3
It doesn't seem like this feature is supported directly on the table API. You could convert the table to a DataStream and set the side output on that.Yuval Itzchakov

3 Answers

1
votes

As of Flink 1.8.0, it doesn't seem like the Table API currently supports this directly. One way to work around that is to convert your table into a DataStream[Row] and set the side output on that:

val outputTag = OutputTag[String]("side-output")

val flink = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(flink)

// Make sure the source emits data to the selected side output
tableEnv.registerTableSource(...)
val table = tableEnv.sqlQuery("QUERY")

// Can also be toAppendStream, depending on the underlying table output
val dataStream = tableEnv.toRetractStream(table)
val sideOutputStream = dataStream.getSideOutput(outputTag)
0
votes

I found one solution. I was currently using BoundedOutOfOrdernessTimestampExtractor, which provides the watermark timestamp information. I used this information to split the input stream and process the late-stream separately.

0
votes

Here is the code example for splitting:

  val mainSource = env.addSource(someSource)

  val splitted = mainSource.split(
      (x:DataKpi)=> isLate(x.getTimestamp) match {
        case false =>List("regular")
        case true =>List("late")
      }
    )
  val regularSource= splitted select "regular"
  val lateSource=    splitted select "late"

regularSource and lateSource are new streams processed separately later. Before using this approach, we were encountering some duplicities. The isLate() function is a custom function that decides if the element is probably late or not. This function uses the information of current watermark provided by BoundedOutOfOrdernessTimestampExtractor.getCurrentWatermark.getTimestamp in my case.