0
votes

I have a streaming input, say stock price data (including multiple stocks), and I want to do a ranking by their price every 1 minutes. The ranking is based on all stocks' latest price and needs to sort all of them no matter if it was updated in the previous 1 minute or not. I tried to use ORDER BY in flink stream SQL.

I failed to implement my logic and I am confused about two parts:

  1. Why can ORDER BY only use a time attribute as primary and only support ASC? How can I implement an order by an other type like price?

  2. What does the below SQL (from the Flink document) mean? There is no window and there is no window so I assume the SQL will be executed immediately for each order come in, in that case, it looks meaningless to sort one element.

[Update]: When I read the code of ProcimeSortProcessFunction.scala, it seems that Flink sorts the elements received during the next one millisecond.

SELECT *
FROM Orders
ORDER BY orderTime

Finally, is there a way to implement my logic in SQL?

1

1 Answers

1
votes

ORDER BY in streaming queries are difficult to compute because we don't want to update the whole result when we have to emit a result that would need to go to the beginning of the result table. Therefore, we only support ORDER BY time-attribute if we can guarantee that the results have (roughly) increasing timestamps.

In the future (Flink 1.6 or later), we will also support some queries like ORDER BY x ASC LIMIT 10, which will result in an updating table that contains the records with the 10 smallest x values.

Anyway, you cannot (easily) compute a top-k ranking per minute using a GROUP BY tumbling window. GROUP BY queries aggregate the records of group (also window in case of GROUP BY TUMBLE(rtime, INTERVAL '1' MINUTE)) into a single record. So there won't be multiple records per minute but just one.

If you'd like a query to compute top-10 on field a per minute you would need a query similar to this one:

SELECT a, b, c 
FROM (
  SELECT 
    a, b, c, 
    RANK() OVER (ORDER BY a PARTITION BY CEIL(t TO MINUTE) BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as rank 
  FROM yourTable)
WHERE rank <= 10

However, such queries are not yet supported by Flink (version 1.4) because the time attribute is used in the PARTITION BY clause and not the ORDER BY clause of the OVER window.