234
votes

Possible Duplicate:
Rolling median algorithm in C

Given that integers are read from a data stream. Find median of elements read so far in efficient way.

Solution I have read: We can use a max heap on left side to represent elements that are less than the effective median, and a min heap on right side to represent elements that are greater than the effective median.

After processing an incoming element, the number of elements in heaps differ at most by 1 element. When both heaps contain the same number of elements, we find the average of heap's root data as effective median. When the heaps are not balanced, we select the effective median from the root of heap containing more elements.

But how would we construct a max heap and min heap i.e. how would we know the effective median here? I think that we would insert 1 element in max-heap and then the next 1 element in min-heap, and so on for all the elements. Correct me If I am wrong here.

9
Clever algorithm, using heaps. From the title I couldn't immediately think of a solution.Mooing Duck
vizier's solution looks good to me, except that I was assuming (though you did not state) that this stream could be arbitrarily long, so you couldn't keep everything in memory. Is that the case?Running Wild
@RunningWild For arbitrarily long streams, you could get the median of the last N elements by using Fibonacci heaps (so you get log(N) deletes) and storing pointers to inserted elements in order (in e.g. a deque), then removing the oldest element at each step once the heaps are full (maybe also moving things from one heap to the other). You could get somewhat better than N by storing the numbers of repeated elements (if there are lots of repeats), but in general, I think you have to make some kind of distributional assumptions if you want the median of the whole stream.Danica
You can start with both heaps empty. First int goes in one heap; second goes either in the other, or you move the first item to the other heap and then insert. This generalizes to "don't allow one heap to go bigger than the other +1" and no special casing is needed (the "root value" of an empty heap can be defined as 0)Jon Watte
I JUST got this question on a MSFT interview. Thank you for postingR Claven

9 Answers

395
votes

There are a number of different solutions for finding running median from streamed data, I will briefly talk about them at the very end of the answer.

The question is about the details of the a specific solution (max heap/min heap solution), and how heap based solution works is explained below:

For the first two elements add smaller one to the maxHeap on the left, and bigger one to the minHeap on the right. Then process stream data one by one,

Step 1: Add next item to one of the heaps

   if next item is smaller than maxHeap root add it to maxHeap,
   else add it to minHeap

Step 2: Balance the heaps (after this step heaps will be either balanced or
   one of them will contain 1 more item)

   if number of elements in one of the heaps is greater than the other by
   more than 1, remove the root element from the one containing more elements and
   add to the other one

Then at any given time you can calculate median like this:

   If the heaps contain equal amount of elements;
     median = (root of maxHeap + root of minHeap)/2
   Else
     median = root of the heap with more elements

Now I will talk about the problem in general as promised in the beginning of the answer. Finding running median from a stream of data is a tough problem, and finding an exact solution with memory constraints efficiently is probably impossible for the general case. On the other hand, if the data has some characteristics we can exploit, we can develop efficient specialized solutions. For example, if we know that the data is an integral type, then we can use counting sort, which can give you a constant memory constant time algorithm. Heap based solution is a more general solution because it can be used for other data types (doubles) as well. And finally, if the exact median is not required and an approximation is enough, you can just try to estimate a probability density function for the data and estimate median using that.

59
votes

If the variance of the input is statistically distributed (e.g. normal, log-normal, etc.) then reservoir sampling is a reasonable way of estimating percentiles/medians from an arbitrarily long stream of numbers.

int n = 0;  // Running count of elements observed so far  
#define SIZE 10000
int reservoir[SIZE];  

while(streamHasData())
{
  int x = readNumberFromStream();

  if (n < SIZE)
  {
       reservoir[n++] = x;
  }         
  else 
  {
      int p = random(++n); // Choose a random number 0 >= p < n
      if (p < SIZE)
      {
           reservoir[p] = x;
      }
  }
}

"reservoir" is then a running, uniform (fair), sample of all input - regardless of size. Finding the median (or any percentile) is then a straight-forward matter of sorting the reservoir and polling the interesting point.

Since the reservoir is fixed size, the sort can be considered to be effectively O(1) - and this method runs with both constant time and memory consumption.

52
votes

If you can't hold all the items in memory at once, this problem becomes much harder. The heap solution requires you to hold all the elements in memory at once. This is not possible in most real world applications of this problem.

Instead, as you see numbers, keep track of the count of the number of times you see each integer. Assuming 4 byte integers, that's 2^32 buckets, or at most 2^33 integers (key and count for each int), which is 2^35 bytes or 32GB. It will likely be much less than this because you don't need to store the key or count for those entries that are 0 (ie. like a defaultdict in python). This takes constant time to insert each new integer.

Then at any point, to find the median, just use the counts to determine which integer is the middle element. This takes constant time (albeit a large constant, but constant nonetheless).

31
votes

The most efficient way to calculate a percentile of a stream that I have found is the P² algorithm: Raj Jain, Imrich Chlamtac: The P² Algorithm for Dynamic Calculation of Quantiiles and Histograms Without Storing Observations. Commun. ACM 28(10): 1076-1085 (1985)

The algorithm is straight forward to implement and works extremely well. It is an estimate, however, so keep that in mind. From the abstract:

A heuristic algorithm is proposed for dynamic calculation qf the median and other quantiles. The estimates are produced dynamically as the observations are generated. The observations are not stored; therefore, the algorithm has a very small and fixed storage requirement regardless of the number of observations. This makes it ideal for implementing in a quantile chip that can be used in industrial controllers and recorders. The algorithm is further extended to histogram plotting. The accuracy of the algorithm is analyzed.

29
votes

If we want to find the median of the n most recently seen elements, this problem has an exact solution that only needs the n most recently seen elements to be kept in memory. It is fast and scales well.

An indexable skiplist supports O(ln n) insertion, removal, and indexed search of arbitrary elements while maintaining sorted order. When coupled with a FIFO queue that tracks the n-th oldest entry, the solution is simple:

class RunningMedian:
    'Fast running median with O(lg n) updates where n is the window size'

    def __init__(self, n, iterable):
        self.it = iter(iterable)
        self.queue = deque(islice(self.it, n))
        self.skiplist = IndexableSkiplist(n)
        for elem in self.queue:
            self.skiplist.insert(elem)

    def __iter__(self):
        queue = self.queue
        skiplist = self.skiplist
        midpoint = len(queue) // 2
        yield skiplist[midpoint]
        for newelem in self.it:
            oldelem = queue.popleft()
            skiplist.remove(oldelem)
            queue.append(newelem)
            skiplist.insert(newelem)
            yield skiplist[midpoint]

Here are links to complete working code (an easy-to-understand class version and an optimized generator version with the indexable skiplist code inlined):

19
votes

An intuitive way to think about this is that if you had a full balanced binary search tree, then the root would be the median element, since there there would be the same number of smaller and greater elements. Now, if the tree isn't full this won't be quite the case since there will be elements missing from the last level.

So what we can do instead is have the median, and two balanced binary trees, one for elements less than the median, and one for elements greater than the median. The two trees must be kept at the same size.

When we get a new integer from the data stream, we compare it to the median. If it's greater than the median, we add it to the right tree. If the two tree sizes differ more than 1, we remove the min element of the right tree, make it the new median, and put the old median in the left tree. Similarly for smaller.

8
votes

Efficient is a word that depends on context. The solution to this problem depends on the amount of queries performed relative to the amount of insertions. Suppose you are inserting N numbers and K times towards the end you were interested in the median. The heap based algorithm's complexity would be O(N log N + K).

Consider the following alternative. Plunk the numbers in an array, and for each query, run the linear selection algorithm (using the quicksort pivot, say). Now you have an algorithm with running time O(K N).

Now if K is sufficiently small (infrequent queries), the latter algorithm is actually more efficient and vice versa.

0
votes

Here is my simple but efficient algorithm (in C++) for calculating running median from a stream of integers:

#include<algorithm>
#include<fstream>
#include<vector>
#include<list>

using namespace std;

void runningMedian(std::ifstream& ifs, std::ofstream& ofs, const unsigned bufSize) {
    if (bufSize < 1)
        throw exception("Wrong buffer size.");
    bool evenSize = bufSize % 2 == 0 ? true : false;
    list<int> q;
    vector<int> nums;
    int n;
    unsigned count = 0;
    while (ifs.good()) {
        ifs >> n;
        q.push_back(n);
        auto ub = std::upper_bound(nums.begin(), nums.end(), n);
        nums.insert(ub, n);
        count++;
        if (nums.size() >= bufSize) {
            auto it = std::find(nums.begin(), nums.end(), q.front());
            nums.erase(it);
            q.pop_front();
            if (evenSize)
                ofs << count << ": " << (static_cast<double>(nums[nums.size() / 2 - 1] +
                static_cast<double>(nums[nums.size() / 2]))) / 2.0 << '\n';
            else
                ofs << count << ": " << static_cast<double>(nums[nums.size() / 2]);
        }
    }
}

The bufferSize specifies the size of the numbers sequence, on which the running median must be calculated. When reading numbers from the input stream ifs the vector of the size bufferSize is maintained in sorted order. The median is calculated by taking the middle of the sorted vector, if bufferSize is odd, or the sum of the two middle elements divided by 2, when bufferSize is even. Additinally, I maintain a list of last bufferSize elements read from input. When a new element is added, I put it in the right place in sorted vector and remove from the vector the element added bufferSize steps before (the value of the element retained in the front of the list). In the same time I remove the old element from the list: every new element is placed on the back of the list, every old element is removed from the front. After reaching the bufferSize, both the list and the vector stop to grow, and every insertion of a new element is compensated be deletion of an old element, placed in the list bufferSize steps before. Note, I do not care, whether I remove from the vector exactly the element, placed bufferSize steps before, or just an element that has the same value. For the value of median it does not matter. All calculated median values are output in the output stream.

-2
votes

Can't you do this with just one heap? Update: no. See the comment.

Invariant: After reading 2*n inputs, the min-heap holds the n largest of them.

Loop: Read 2 inputs. Add them both to the heap, and remove the heap's min. This reestablishes the invariant.

So when 2n inputs have been read, the heap's min is the nth largest. There'll need to be a little extra complication to average the two elements around the median position and to handle queries after an odd number of inputs.