4
votes

I'm required to calculate median of many parameters received from a kafka stream for 15 min time window.

i couldn't find any built in function for that, but I have found a way using custom WindowFunction.

my questions are:

  1. is it a difficult task for flink? the data can be very large.
  2. if the data gets to giga bytes, will flink store everything in memory until the end of the time window? (one of the arguments of apply WindowFunction implementation is Iterable - a collection of all data which came during the time window )

thanks

1
Can you tell us something about the incoming data? Are we talking about integers or floats? If it is integer do you have a known range of data? At the end of the day the problem is sorting a big amount of numbers. If we are just talking about Gigabytes that sounds like you can handle it in memory. If not you can think about something like an external merge sort (which is quite easy with flink).TobiSH
If you are not interested in an exact result you could also thing about an estimation: CountMinSketch might be your friend.TobiSH
The data is log events from kafka. Basically those are about 20 load time metrics withe the range of 0 to 30000 (int). Usually smaller than 10000.. I need to do aggregation by thousands of dimensions combinations , such as websites, pages, browser and more - calculate average, median, percentiles. I cannot find an documentation for count-min-sketch. Also table api looks interesting, but i didn't find median as well Thanks!Lior Goldemberg
There has been some discussion of implementing the count min sketch here: issues.apache.org/jira/browse/FLINK-2147David Anderson
i wish Flink's documentation had more examplesLior Goldemberg

1 Answers

0
votes

Your question contains several aspects, but let me answer the most fundamental one:

Is this a hard task for Flink, why is this not a standard example?

Yes, the median is a hard concept, as the only way to determine it is to keep the full data.

Many statistics don't need the full data to be calculated. For instance:

  • If you have the total sum, you can take the previous total sum and add the latest observation.
  • If you have the total count, you add 1 and have the new total count
  • If you have the average, under the hood you can just keep track of the total sum and count, and at any point calculate the new average based on an observation.

This can even be done with more complicated metrics, like the standard deviation.

However, there is no shortcut for determining the median, the only way to know what the median is after adding a new observation, is by looking at all observations and then figuring out what the middle one is.

As such, it is a challenging metric and the size of the data that comes in will need to be handled. As mentioned there may be estimates in the workings like this: https://issues.apache.org/jira/browse/FLINK-2147

Alternately, you could look at how your data is distributed, and perhaps estimate the median with metrics like Mean, Skew, and Kurtosis.

A final solution I could come up with, is if you need to know approximately what the value should be, is to pick a few 'candidates' and count the fractin of observations below them. The one closest to 50% would then be a reasonable estimate.