0
votes

Is there any way to sort the DataStream efficiently other than WindowAll operation?

Let's take the use case of page view that is defined in the Flink example ClickEventCount. This example tries to aggregate the page view count for every 15 minutes window.

What is the efficient way if I want to change this to get the top 3 page views for 15 windows?

One option is to use windowAll function after the aggregate function, and do the in-memory sort. The problem is that - windowAll will reduce the parallelism to 1 and it demands to keep all the data in the same task slot to perform sort operation. Ideally, this is fine when we have few distinct keys (ie - distinct page URL). In my use case, there will be millions or billions of keys in 15 minutes window, so, all those million or billions aggregated rows have to pass through the network and it may cause CPU intensive operation to take just 3.

Is there any way something like, take top 3 page views from its own task slot locally then use windowAll function to receive 3 elements from each task slot then just do the sort operation to pick the top 3 page views? In this method, the network time will be reduced also the sorting time will be less. Do we have such API or any possible method to achieve this use case efficiently?

1

1 Answers

0
votes

It sounds like a Top-N query is what you're looking for.

When used on a dynamic streaming table, a top-n query will continuously calculate the "Top-N" rows based on a given attribute. The Flink SQL Cookbook has a nice example, which I'll excerpt here:

The Ministry of Magic tracks every spell a wizard casts throughout Great Britain and wants to know every wizard's Top 2 all-time favorite spells.

Flink SQL can be used to calculate continuous aggregations, so if we know each spell a wizard has cast, we can maintain a continuous total of how many times they have cast that spell.

SELECT wizard, spell, COUNT(*) AS times_cast
FROM spells_cast
GROUP BY wizard;

This result can be used in an OVER window to calculate a Top-N. The rows are partitioned using the wizard column, and are then ordered based on the count of spell casts (times_cast DESC). The built-in function ROW_NUMBER() assigns a unique, sequential number to each row, starting from one, according to the rows' ordering within the partition. Finally, the results are filtered for only those rows with a row_num <= 2 to find each wizard's top 2 favorite spells.

Where Flink is most potent in this query is its ability to issue retractions. As wizards cast more spells, their top 2 will change. When this occurs, Flink will issue a retraction, modifying its output, so the result is always correct and up to date.

SELECT wizard, spell, times_cast
FROM (
    SELECT *,
    ROW_NUMBER() OVER (PARTITION BY wizard ORDER BY times_cast DESC) AS row_num
    FROM (SELECT wizard, spell, COUNT(*) AS times_cast FROM spells_cast GROUP BY wizard, spell)
)
WHERE row_num <= 2;

If for some reason you'd rather not use the SQL/Table API, you can always pre-aggregate with keyed windows first, and then do a final windowAll to compute the overall "winners". There's an example of this pattern in the Flink training. It's mentioned here -- https://ci.apache.org/projects/flink/flink-docs-stable/learn-flink/streaming_analytics.html#windows-can-follow-windows -- and then there's an exercise/example here: http://github.com/apache/flink-training/tree/master/hourly-tips.