126
votes

I want to create an efficient circular buffer in python (with the goal of taking averages of the integer values in the buffer).

Is this an efficient way to use a list to collect values?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

What would be more efficient (and why)?

14
This is not an efficient way to implement circular buffer because pop(0) is O(n) operation in the list. pop(0) removes the first element in the list and all elements have to be shifted to the left. Use collections.deque with maxlen attribute instead. deque has O(1) operation for append and pop.Vlad Bezden

14 Answers

236
votes

I would use collections.deque with a maxlen arg

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

There is a recipe in the docs for deque that is similar to what you want. My assertion that it's the most efficient rests entirely on the fact that it's implemented in C by an incredibly skilled crew that is in the habit of cranking out top notch code.

15
votes

Although there are already a great number of great answers here, I could not find any direct comparison of timings for the options mentioned. Therefore, please find my humble attempt at a comparison below.

For testing purposes only, the class can switch between a list-based buffer, a collections.deque-based buffer, and a Numpy.roll-based buffer.

Note that the update method adds only one value at a time, to keep it simple.

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method

    def update(self, scalar):
        if self.method == self.buffer_methods[0]:
            # Use list
            try:
                self.content.append(scalar)
                self.content.pop(0)
            except AttributeError:
                self.content = [0.] * self.size
        elif self.method == self.buffer_methods[1]:
            # Use collections.deque
            try:
                self.content.append(scalar)
            except AttributeError:
                self.content = collections.deque([0.] * self.size,
                                                 maxlen=self.size)
        elif self.method == self.buffer_methods[2]:
            # Use Numpy.roll
            try:
                self.content = numpy.roll(self.content, -1)
                self.content[-1] = scalar
            except IndexError:
                self.content = numpy.zeros(self.size, dtype=float)

# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                                   buffer_method=method)
                    for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size)
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == range(circular_buffer_size)
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
    print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
        cb.method, timeit_results[-1],
        timeit_results[-1] / timeit_iterations * 1e3)

On my system this yields:

list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)
14
votes

popping from the head of a list causes the whole list to be copied, so is inefficient

You should instead use a list/array of fixed size and an index which moves through the buffer as you add/remove items

10
votes

Based on MoonCactus's answer, here is a circularlist class. The difference with his version is that here c[0] will always give the oldest-appended element, c[-1] the latest-appended element, c[-2] the penultimate... This is more natural for applications.

c = circularlist(4)
c.append(1); print(c, c[0], c[-1])    #[1] (1/4 items)              1  1
c.append(2); print(c, c[0], c[-1])    #[1, 2] (2/4 items)           1  2
c.append(3); print(c, c[0], c[-1])    #[1, 2, 3] (3/4 items)        1  3
c.append(8); print(c, c[0], c[-1])    #[1, 2, 3, 8] (4/4 items)     1  8
c.append(10); print(c, c[0], c[-1])   #[2, 3, 8, 10] (4/4 items)    2  10
c.append(11); print(c, c[0], c[-1])   #[3, 8, 10, 11] (4/4 items)   3  11
d = circularlist(4, [1, 2, 3, 4, 5])  #[2, 3, 4, 5]

Class:

class circularlist(object):
    def __init__(self, size, data = []):
        """Initialization"""
        self.index = 0
        self.size = size
        self._data = list(data)[-size:]

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        else:
            self._data.append(value)
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])
        else:
            return(self._data[key])

    def __repr__(self):
        """Return string representation"""
        return (self._data[self.index:] + self._data[:self.index]).__repr__() + ' (' + str(len(self._data))+'/{} items)'.format(self.size)
9
votes

Python's deque is slow. You can also use numpy.roll instead How do you rotate the numbers in an numpy array of shape (n,) or (n,1)?

In this benchmark, deque is 448ms. Numpy.roll is 29ms http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

8
votes

ok with the use of deque class, but for the requeriments of the question (average) this is my solution:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
4
votes

How about the solution from the Python Cookbook, including a reclassification of the ring buffer instance when it becomes full?

class RingBuffer:
    """ class that implements a not-yet-full buffer """
    def __init__(self,size_max):
        self.max = size_max
        self.data = []

    class __Full:
        """ class that implements a full buffer """
        def append(self, x):
            """ Append an element overwriting the oldest one. """
            self.data[self.cur] = x
            self.cur = (self.cur+1) % self.max
        def get(self):
            """ return list of elements in correct order """
            return self.data[self.cur:]+self.data[:self.cur]

    def append(self,x):
        """append an element at the end of the buffer"""
        self.data.append(x)
        if len(self.data) == self.max:
            self.cur = 0
            # Permanently change self's class from non-full to full
            self.__class__ = self.__Full

    def get(self):
        """ Return a list of elements from the oldest to the newest. """
        return self.data

# sample usage
if __name__=='__main__':
    x=RingBuffer(5)
    x.append(1); x.append(2); x.append(3); x.append(4)
    print(x.__class__, x.get())
    x.append(5)
    print(x.__class__, x.get())
    x.append(6)
    print(x.data, x.get())
    x.append(7); x.append(8); x.append(9); x.append(10)
    print(x.data, x.get())

The notable design choice in the implementation is that, since these objects undergo a nonreversible state transition at some point in their lifetimes—from non-full buffer to full-buffer (and behavior changes at that point)—I modeled that by changing self.__class__. This works even in Python 2.2, as long as both classes have the same slots (for example, it works fine for two classic classes, such as RingBuffer and __Full in this recipe).

Changing the class of an instance may be strange in many languages, but it is a Pythonic alternative to other ways of representing occasional, massive, irreversible, and discrete changes of state that vastly affect behavior, as in this recipe. Good thing that Python supports it for all kinds of classes.

Credit: Sébastien Keim

3
votes

You can also see this quite old Python recipe.

Here is my own version with NumPy array:

#!/usr/bin/env python

import numpy as np

class RingBuffer(object):
    def __init__(self, size_max, default_value=0.0, dtype=float):
        """initialization"""
        self.size_max = size_max

        self._data = np.empty(size_max, dtype=dtype)
        self._data.fill(default_value)

        self.size = 0

    def append(self, value):
        """append an element"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value 

        self.size += 1

        if self.size == self.size_max:
            self.__class__  = RingBufferFull

    def get_all(self):
        """return a list of elements from the oldest to the newest"""
        return(self._data)

    def get_partial(self):
        return(self.get_all()[0:self.size])

    def __getitem__(self, key):
        """get element"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        s = self._data.__repr__()
        s = s + '\t' + str(self.size)
        s = s + '\t' + self.get_all()[::-1].__repr__()
        s = s + '\t' + self.get_partial()[::-1].__repr__()
        return(s)

class RingBufferFull(RingBuffer):
    def append(self, value):
        """append an element when buffer is full"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value
2
votes

I've had this problem before doing serial programming. At the time just over a year ago, I couldn't find any efficient implementations either, so I ended up writing one as a C extension and it's also available on pypi under an MIT license. It's super basic, only handles buffers of 8-bit signed chars, but is of flexible length, so you can use Struct or something on top of it if you need something other than chars. I see now with a google search that there are several options these days though, so you might want to look at those too.

2
votes

This one does not require any library. It grows a list and then cycle within by index.

The footprint is very small (no library), and it runs twice as fast as dequeue at least. This is good to compute moving averages indeed, but be aware that the items are not kept sorted by age as above.

class CircularBuffer(object):
    def __init__(self, size):
        """initialization"""
        self.index= 0
        self.size= size
        self._data = []

    def record(self, value):
        """append an element"""
        if len(self._data) == self.size:
            self._data[self.index]= value
        else:
            self._data.append(value)
        self.index= (self.index + 1) % self.size

    def __getitem__(self, key):
        """get element by index like a regular array"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

    def get_all(self):
        """return a list of all the elements"""
        return(self._data)

To get the average value, e.g.:

q= CircularBuffer(1000000);
for i in range(40000):
    q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())

Results in:

capacity= 1000000
stored= 40000
average= 19999

real 0m0.024s
user 0m0.020s
sys  0m0.000s

This is about 1/3 the time of the equivalent with dequeue.

2
votes

You answer is not right. Circular buffer main have two priciples(https://en.wikipedia.org/wiki/Circular_buffer)

  1. The lenth of the buffer is setted;
  2. First in first out;
  3. When you add or delete a item, the other items should not move their position

your code below:

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

Let's consider a situation that the list is full, by use your code:

self.mylist = [1, 2, 3, 4, 5]

now we append 6, the list is changed to

self.mylist = [2, 3, 4, 5, 6]

the items expect 1 in list has changed their position

your code is a queue, not a circle buffer.

The answer of Basj, I think is the most efficent one.

By the way, a circle buffer can imporve the performance of the operation to add a item.

1
votes

From Github:

class CircularBuffer:

    def __init__(self, size):
        """Store buffer in given storage."""
        self.buffer = [None]*size
        self.low = 0
        self.high = 0
        self.size = size
        self.count = 0

    def isEmpty(self):
        """Determines if buffer is empty."""
        return self.count == 0

    def isFull(self):
        """Determines if buffer is full."""
        return self.count == self.size

    def __len__(self):
        """Returns number of elements in buffer."""
        return self.count

    def add(self, value):
        """Adds value to buffer, overwrite as needed."""
        if self.isFull():
            self.low = (self.low+1) % self.size
        else:
            self.count += 1
        self.buffer[self.high] = value
        self.high = (self.high + 1) % self.size

    def remove(self):
        """Removes oldest value from non-empty buffer."""
        if self.count == 0:
            raise Exception ("Circular Buffer is empty");
        value = self.buffer[self.low]
        self.low = (self.low + 1) % self.size
        self.count -= 1
        return value

    def __iter__(self):
        """Return elements in the circular buffer in order using iterator."""
        idx = self.low
        num = self.count
        while num > 0:
            yield self.buffer[idx]
            idx = (idx + 1) % self.size
            num -= 1

    def __repr__(self):
        """String representation of circular buffer."""
        if self.isEmpty():
            return 'cb:[]'

        return 'cb:[' + ','.join(map(str,self)) + ']'

https://github.com/heineman/python-data-structures/blob/master/2.%20Ubiquitous%20Lists/circBuffer.py

0
votes

The original question was: "efficient" circular buffer. According to this efficiency asked for, the answer from aaronasterling seems to be definitively correct. Using a dedicated class programmed in Python and comparing time processing with collections.deque shows a x5.2 times acceleration with deque! Here is very simple code to test this:

class cb:
    def __init__(self, size):
        self.b = [0]*size
        self.i = 0
        self.sz = size
    def append(self, v):
        self.b[self.i] = v
        self.i = (self.i + 1) % self.sz

b = cb(1000)
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 1.097 second on my laptop

from collections import deque
b = deque( [], 1000 )
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 0.211 second on my laptop

To transform a deque into a list, just use:

my_list = [v for v in my_deque]

You will then get O(1) random access to the deque items. Of course, this is only valuable if you need to do many random accesses to the deque after having set it once.

0
votes

This is applying the same principal to some buffers intended to hold the most recent text messages.

import time
import datetime
import sys, getopt

class textbffr(object):
    def __init__(self, size_max):
        #initialization
        self.posn_max = size_max-1
        self._data = [""]*(size_max)
        self.posn = self.posn_max

    def append(self, value):
        #append an element
        if self.posn == self.posn_max:
            self.posn = 0
            self._data[self.posn] = value   
        else:
            self.posn += 1
            self._data[self.posn] = value

    def __getitem__(self, key):
        #return stored element
        if (key + self.posn+1) > self.posn_max:
            return(self._data[key - (self.posn_max-self.posn)])
        else:
            return(self._data[key + self.posn+1])


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max):
        stored = bffr[ind]
        if stored != "":
            print(stored)
    print ( '\n' )

def make_time_text(time_value):
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
      + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
      + str(time_value.second).zfill(2))


def main(argv):
    #Set things up 
    starttime = datetime.datetime.now()
    log_max = 5
    status_max = 7
    log_bffr = textbffr(log_max)
    status_bffr = textbffr(status_max)
    scan_count = 1

    #Main Loop
    # every 10 secounds write a line with the time and the scan count.
    while True: 

        time_text = make_time_text(datetime.datetime.now())
        #create next messages and store in buffers
        status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
        log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")

        #print whole buffers so far
        print_bffr(log_bffr,log_max)
        print_bffr(status_bffr,status_max)

        time.sleep(2)
        scan_count += 1 

if __name__ == '__main__':
    main(sys.argv[1:])