1
votes

Recently I am trying to use Apache Flink for fast batch processing. I have a table with a column:value and an irrelevant index column

Basically I want to calculate the mean and range of every 5 rows of value. Then I am going to calculate the mean and standard deviation based on those mean I just calculated. So I guess the best way is to use Tumble window.

It looks like this

DataSet<Tuple2<Double, Integer>> rawData = {get the source data};
Table table = tableEnvironment.fromDataSet(rawData);
Table groupedTable = table
            .window(Tumble.over("5.rows").on({what should I write?}).as("w")
            .groupBy("w")
            .select("f0.avg, f0.max-f0.min");

{The next step is to use groupedTable to calculate overall mean and stdDev} 

But I don't know what to write in .on(). I have tried "proctime" but it said there is no such input. I just want it to group by the order as it reads from the source. But it has to be a time attribute so I cannot use "f2" - the index column as ordering as well.

Do I have to add a timestamp to do this? Is it necessary in batch processing and will it slow down the calculation? What is the best way to solve this?

Update : I tried to use a sliding window in the table API and it gets me Exception.

// Calculate mean value in each group
    Table groupedTable = table
            .groupBy("f0")
            .select("f0.cast(LONG) as groupNum, f1.avg as avg")
            .orderBy("groupNum");

//Calculate moving range of group Mean using sliding window
    Table movingRangeTable = groupedTable
            .window(Slide.over("2.rows").every("1.rows").on("groupNum").as("w"))
            .groupBy("w")
            .select("groupNum.max as groupNumB, (avg.max - avg.min) as MR");

The Exception is:

Exception in thread "main" java.lang.UnsupportedOperationException: Count sliding group windows on event-time are currently not supported.

at org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.createEventTimeSlidingWindowDataSet(DataSetWindowAggregate.scala:456)

at org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:139)

...

Does that mean that sliding window is not supported in Table API? If I recall correctly there is no window function in DataSet API. Then how do I calculate moving range in batch process?

1

1 Answers

0
votes

The window clause is used to define a grouping based on a window function, such as Tumble or Session. Grouping every 5 rows is not well defined in the Table API (or SQL) unless you specify the order of the rows. This is done in the on clause of the Tumble function. Since this feature originates from stream processing, the on clause expects a timestamp attribute.

You can fetch the timestamp of the current time using the currentTimestamp() function. However, I should point out that Flink will sort the data as it is not aware of the monotonic property of the function. Moreover, all of that will with a parallelism of 1 because there is no clause that would allow for partitioning.

Alternatively, you can also implement a user-defined scalar function that converts the index attribute into a timestamp (effectively a Long value). But again, Flink will do a full sort of the data.