3
votes

I need a very fast algorithm for the following task. I have already implemented several algorithms that complete it, but they're all too slow for the performance I need. It should be fast enough that the algorithm can be run at least 100,000 times a second on a modern CPU. It will be implemented in C++.

I am working with spans/ranges, a structure that has a start and an end coordinate on a line.

I have two vectors (dynamic arrays) of spans and I need to merge them. One vector is src and the other dst. The vectors are sorted by span start coordinates, and the spans do not overlap within one vector.

The spans in the src vector must be merged with the spans in the dst vector, such that the resulting vector is still sorted and has no overlaps. Ie. if overlaps are detected during the merging, the two spans are merged into one. (Merging two spans is just a matter of changing the coordinates in the structure.)

Now, there is one more catch, the spans in the src vector must be "widened" during the merge. This means that a constant will be added to the start and another (larger) constant to the end coordinate of every span in src. This means that after the src spans are widened they might overlap.


What I have arrived at so far is that it cannot be done fully in-place, some kind of temporary storage is needed. I think it should be doable in linear time over the number of elements of src and dst summed.

Any temporary storage can probably be shared between multiple runs of the algorithm.

The two primary approaches I have tried, which are too slow, are:

  1. Append all elements of src to dst, widening each element before appending it. Then run an in-place sort. Finally iterate over the resulting vector using a "read" and "write" pointer, with the read pointer running ahead of the write pointer, merging spans as they go. When all elements have been merged (the read pointer reaches end) dst is truncated.

  2. Create a temporary work-vector. Do a naive merge as described above by repeatedly picking the next element from either src or dst and merging into the work-vector. When done, copy the work-vector to dst, replacing it.

The first method has the problem that sorting is O((m+n)*log(m+n)) instead of O(m+n) and has somewhat overhead. It also means the dst vector has to grow much larger than it really needs.

The second has the primary problem of a lot of copying around and again allocation/deallocation of memory.

The data structures used for storing/managing the spans/vectors can be altered if you think that's needed.

Update: Forgot to say how large the datasets are. The most common cases are between 4 and 30 elements in either vector, and either dst is empty or there is a large amount of overlap between the spans in src and dst.

9

9 Answers

8
votes

We know that the absolute best case runtime is O(m+n) this is due to the fact that you at least have to scan over all of the data in order to be able to merge the lists. Given this, your second method should give you that type of behavior.

Have you profiled your second method to find out what the bottlenecks are? It is quite possible that, depending on the amount of data you are talking about it is actually impossible to do what you want in the specified amount of time. One way to verify this is to do something simple like sum up all the start and end values of the spans in each vector in a loop, and time that. Basically here you are doing a minimal amount of work for each element in the vectors. This will provide you with a baseline for the best performance you can expect to get.

Beyond that you can avoid copying the vectors element by element by using the stl swap method, and you can preallocate the temp vector to a certain size in order to avoid triggering the expansion of the array when you are merging the elements.

You might consider using 2 vectors in your system and whenever you need to do a merge you merge into the unused vector, and then swap (this is similar to double buffering used in graphics). This way you don't have to reallocate the vectors every time you do the merge.

However, you are best off profiling first and finding out what your bottleneck is. If the allocations are minimal compared to the actual merging process than you need to figure out how to make that faster.

Some possible additional speedups could come from accessing the vectors raw data directly which avoids the bounds checks on each access the data.

0
votes

The sort you mention in Approach 1 can be reduced to linear time (from log-linear as you describe it) because the two input lists are already sorted. Just perform the merge step of merge-sort. With an appropriate representation for the input span vectors (for example singly-linked lists) this can be done in-place.

http://en.wikipedia.org/wiki/Merge_sort

0
votes

How about the second method without repeated allocation--in other words, allocate your temporary vector once, and never allocate it again? Or, if the input vectors are small enough (But not constant size), just use alloca instead of malloc.

Also, in terms of speed, you may want to make sure that your code is using CMOV for the sorting, since if the code is actually branching for every single iteration of the mergesort:

if(src1[x] < src2[x])
    dst[x] = src1[x];
else
    dst[x] = src2[x];

The branch prediction will fail 50% of the time, which will have an enormous hit on performance. A conditional move will likely do much much better, so make sure the compiler is doing that, and if not, try to coax it into doing so.

0
votes

i don't think a strictly linear solution is possible, because widening the src vector spans may in the worst-case cause all of them to overlap (depending on the magnitude of the constant that you are adding)

the problem may be in the implementation, not in the algorithm; i would suggest profiling the code for your prior solutions to see where the time is spent

reasoning:

for a truly "modern" CPU like the Intel Core 2 Extreme QX9770 running at 3.2GHz, one can expect about 59,455 MIPS

for 100,000 vectors, you would have to process each vector in 594,550 instuctions. That's a LOT of instructions.

ref: wikipedia MIPS

in addition, note that adding a constant to the src vector spans does not de-sort them, so you can normalize the src vector spans independently, then merge them with the dst vector spans; this should reduce the workload of your original algorithm

0
votes

1 is right out - a full sort is slower than merging two sorted lists.

So you're looking at tweaking 2 (or something entirely new).

If you change the data structures to doubly linked lists, then you can merge them in constant working space.

Use a fixed-size heap allocator for the list nodes, both to reduce memory use per node and to improve the chance that the nodes are close together in memory, reducing page misses.

You might be able to find code online or in your favourite algorithm book to optimise a linked list merge. You'll want to customise this in order to do span coalescing at the same time as the list merge.

To optimise the merge, first note that for each run of values coming off the same side without one coming from the other side, you can insert the whole run into the dst list in one go, instead of inserting each node in turn. And you can save one write per insertion over a normal list operation, by leaving the end "dangling", knowing that you'll patch it up later. And provided that you don't do deletions anywhere else in your app, the list can be singly-linked, which means one write per node.

As for 10 microsecond runtime - kind of depends on n and m...

0
votes

I would always keep my vector of spans sorted. That makes implementing algorithms a LOT easier -- and possible to do in linear time.

OK, so I'd sort the spans based on:

  • span minimum in increasing order
  • then span maximum in decreasing order

You need to create a function to do that.

Then I'd use std::set_union to merge the vectors (you can merge more than one before continuing).

Then for each consecutive sets of spans with identical minimums, you keep the first and remove the rest (they're sub-spans of the first span).

Then you need to merge your spans. That should be pretty doable now and feasible in linear time.

OK, here's the trick now. Don't try to do this in-place. Use one or more temporary vectors (and reserve enough space ahead of time). Then at the end, call std::vector::swap to put the results in the input vector of your choice.

I hope that's enough to get you going.

0
votes

What is your target system? Is it multi-core? If so you could consider multithreading this algorithm

0
votes

I wrote a new container class just for this algorithm, tailored to the needs. This also gave me a chance to adjust other code around my program which got a little speed boost at the same time.

This is significantly faster than the old implementation using STL vectors, but which was otherwise basically the same thing. But while it's faster it's still not really fast enough... unfortunately.

Profiling doesn't reveal what is the real bottleneck any longer. The MSVC profiler seems to sometimes place the "blame" on the wrong calls (supposedly identical runs assign widely different running times) and most calls are getting coalesced into one big chink.

Looking at a disassembly of the generated code shows that there's a very large amount of jumps in the generated code, I think that might be the main reason behind the slowness now.

class SpanBuffer {
private:
    int *data;
    size_t allocated_size;
    size_t count;

    inline void EnsureSpace()
    {
        if (count == allocated_size)
            Reserve(count*2);
    }

public:
    struct Span {
        int start, end;
    };

public:
    SpanBuffer()
        : data(0)
        , allocated_size(24)
        , count(0)
    {
        data = new int[allocated_size];
    }

    SpanBuffer(const SpanBuffer &src)
        : data(0)
        , allocated_size(src.allocated_size)
        , count(src.count)
    {
        data = new int[allocated_size];
        memcpy(data, src.data, sizeof(int)*count);
    }

    ~SpanBuffer()
    {
        delete [] data;
    }

    inline void AddIntersection(int x)
    {
        EnsureSpace();
        data[count++] = x;
    }

    inline void AddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);
        EnsureSpace();
        data[count] = s;
        data[count+1] = e;
        count += 2;
    }

    inline void Clear()
    {
        count = 0;
    }

    inline size_t GetCount() const
    {
        return count;
    }

    inline int GetIntersection(size_t i) const
    {
        return data[i];
    }

    inline const Span * GetSpanIteratorBegin() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data);
    }

    inline Span * GetSpanIteratorBegin()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data);
    }

    inline const Span * GetSpanIteratorEnd() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data+count);
    }

    inline Span * GetSpanIteratorEnd()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data+count);
    }

    inline void MergeOrAddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);

        if (count == 0)
        {
            AddSpan(s, e);
            return;
        }

        int *lastspan = data + count-2;

        if (s > lastspan[1])
        {
            AddSpan(s, e);
        }
        else
        {
            if (s < lastspan[0])
                lastspan[0] = s;
            if (e > lastspan[1])
                lastspan[1] = e;
        }
    }

    inline void Reserve(size_t minsize)
    {
        if (minsize <= allocated_size)
            return;

        int *newdata = new int[minsize];

        memcpy(newdata, data, sizeof(int)*count);

        delete [] data;
        data = newdata;

        allocated_size = minsize;
    }

    inline void SortIntersections()
    {
        assert((count & 1) == 0);
        std::sort(data, data+count, std::less<int>());
        assert((count & 1) == 0);
    }

    inline void Swap(SpanBuffer &other)
    {
        std::swap(data, other.data);
        std::swap(allocated_size, other.allocated_size);
        std::swap(count, other.count);
    }
};


struct ShapeWidener {
    // How much to widen in the X direction
    int widen_by;
    // Half of width difference of src and dst (width of the border being produced)
    int xofs;

    // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call
    SpanBuffer buffer;

    inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst);

    ShapeWidener(int _xofs) : xofs(_xofs) { }
};


inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst)
{
    if (src.GetCount() == 0) return;
    if (src.GetCount() + dst.GetCount() == 0) return;

    assert((src.GetCount() & 1) == 0);
    assert((dst.GetCount() & 1) == 0);

    assert(buffer.GetCount() == 0);

    dst.Swap(buffer);

    const int widen_s = xofs - widen_by;
    const int widen_e = xofs + widen_by;

    size_t resta = src.GetCount()/2;
    size_t restb = buffer.GetCount()/2;
    const SpanBuffer::Span *spa = src.GetSpanIteratorBegin();
    const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin();

    while (resta > 0 || restb > 0)
    {
        if (restb == 0)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else if (resta == 0)
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
        else if (spa->start < spb->start)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
    }

    buffer.Clear();
}
0
votes

If your most recent implementation still isn't fast enough, you might end up having to look at alternative approaches.

What are you using the outputs of this function for?