1
votes

In my current application I am receiving spectral data through a spectrometer. This data is accumulated for one second and then put into a circular buffer. For now I have one consumer, who pops entries from the buffer and then saves everything to disk. Ok all of that stuff works. Now what I need to do is add another consumer, who, in parallel to the saving, does some processing with the spectra. So I have two consumers needing the exact same data (note: they only read and don't modify). Ok but this doesn't work because if one of the consumers pops one entry of the buffer it is gone, so the other would not receive it. I guess the simplest solution to this problem is to give every consumer it's own circular buffer. Fine, but the only problem is: the data entries are big. One entry has a maximum size of around 80MB, so in order to save memory it would be great to not have the same data there twice. Is there any better solution?

Note: I am using a circular buffer so it is ensured that the buffer has a growing limit.

2
If you had a circular buffer per consumer then it would be best if it held a shared_ptr to your data. That way you avoid any copies and you can guarantee that your data will remain valid as long as one of your consumers references it.Mohamad Elghawi
The obvious requirement here is that the two consumers need to co-operate so they can share a pointer to the data. And that they complete at roughly the same time or you'll lose concurrency. Copying the data does not solve that issue at all since that just creates a firehose bug that will eventually blow the heap.Hans Passant

2 Answers

2
votes

Keep two different tail pointers in your buffer, one for each consumer. When the producer is updating the queue, use the farthest tail pointer (the tail pointer which is lagging behind) to check if the buffer is full. Consumers can use their own tail pointers to check if the buffer is empty. This way we get a lockfree buffer, and there is no copying around of data.

See the implementation of disruptor exchange for a discussion about the performance improvement with this solution.

1
votes

I should hope you're receiving your data directly into the queue and not copying it around much....

Any valid solution that would keep a single copy of the data would have to sync all the consumers so that only when they're all done with an entry it can be popped.

You can keep your circular buffer. You only need a single remover to remove an entry when the readers are done with it. I strongly suggest this remover to be the writer of the data. This way it'd be the only guy with write access to the queue, and that simplifies things.

The remover can be fed from the consumers telling it what are they done with.

Consumers can share their read offsets with the remover. You can use atomic_store on the consumer side, and atomic_load on the remover side.

It should be something like that:

struct Consumer {
  ...
  long offset = 0;
  ...
  Consumer() {
    q.remover->add(this);
  }
  ...
  void run() {
    for(;;) {
      entry& e = q.read( offset );
      process( e );
      atomic_store( &offest, offset + e.size() );
    }
  }
};

struct Remover {
  ...
  long remove_offset = 0;
  std::list<Consumer*> cons;
  ...
  void remove() {
    // find lowest read point
    long cons_offset = MAX_LONG;
    for( auto p : cons ) {
      cons_offset = std::min( cons_offset, atomic_load(&p->offset) );
    }
    // remove up to that point
    while( cons_offset > remove_offset ) {
      entry& e = q.read(remove_offset);
      remove_offset += e.size();
      q.remove( e.size() );
    }
  }
};