1
votes

I see many descriptions and examples of the producer-consumer pattern using queues as the shared data buffer. What is the reason for using a queue rather than a single element data buffer?

In every producer-consumer implementation either the producer will produce data faster than the consumer can process the data or the producer will produce data slower than the consumer can process the data. This means that a queue will eventually either fill to its maximum capacity or it will contain only one data element before that data element is consumed by the consumer. Once the queue is full the producer will be required to wait for the consumer before it can add another data element. When the queue contains only one element the consumer will be required to wait for the producer before it can consume another element. In other words, the queue will behave like a single element buffer.

Queue manipulation requires far more processing overhead, and program complexity, than dealing with a single data element. There should be an advantage of some sort to justify the added complexity and overhead.

There are two broad sub-patterns of the producer-consumer pattern. The first is a synchronized producer-consumer pattern. This pattern assures that the consumer consumes each data element produced by the consumer, and consumes it exactly once. A synchronized pattern is discussed above. The second is a sampling consumer.

The sampling consumer consumes only a sample of the data elements produced by the producer. If the consumer consumes data elements faster than the producer produces data elements the consumer is said to over-sample the buffer. If the consumer consumes data elements slower than the producer produces data elements the consumer is said to under-sample the data elements. An example of an under-sampling design would be a weather station which generates temperature readings at a rate of 100Hz (100 times per second) while producing a report at a rate of 1 Hz (1 time per second). The consumer need not read 100 data points each second and produce an average of those readings. Instead it can take 1 reading per second and report that reading. If the consumer samples only 1% of the readings there is no need to provide a queue of data elements in the buffer.

Bounded queues are often used to control memory consumption. Unbounded queues always come with the possibility of running out of heap memory, which can result in application failure.

A bounded multi-element queue may seem like a good idea when there are many producers. If all the data produced by the many producers needs to be processed in exactly the same manner then it may be reasonable to write all that data to a common buffer so that one or more consumers can process the data. There are a few costs associated with a multi-element queue. A multi-element queue uses memory at a rate approximately equal to the size of each data element multiplied by the number of elements in the bounded queue. A multi-element queue requires more complex logic than a single element queue. Complexity is always the enemy of correctness.

When a multi-element queue is used the consumer always reads the data element that has been in the queue the longest time. A queue is, after all, a FIFO data structure. Producers write new elements to the queue until the queue fills. At that point either the producers wait for space on the queue or they overwrite the newest element on the queue. The consumer still reads the oldest element on the queue, no matter what the producers are doing.

When the queue fills the behavior of the queue is exactly like the behavior of a single element queue. In a blocking producer/consumer pattern a full queue forces the producer to wait for a consumer to read from the queue. A single element queue is always either full or empty. A full multi-element queue simply contains many data elements waiting to be eligible to be consumed, plus a single element which is eligible to be consumed.

A single element queue simply eliminates the time in the queue for data waiting to be eligible.

On the consumer side of the pattern, the consumer can only read the oldest element on the queue, no matter how many elements are in the queue. Either an element is available to the consumer or it is not. The size of the queue is invisible to the consumer. If the queue is empty then the consumer must either suspend waiting for available data or it must actively sample the queue for available data. For a sampling queue, as mentioned above, the consumer can simply process the oldest value on the queue, and the queue never marks itself as empty.

It is very difficult to implement a sampling consumer with a multi-element queue. A single element queue will do. The producer simply overwrites whatever is in the queue and the consumer reads whatever is in the queue.

Following is an example of a sampling producer/consumer pattern written in Ada.

------------------------------------------------------------------
-- Sampling Consumer --
------------------------------------------------------------------
with Ada.Text_IO; use Ada.Text_IO;

procedure Sampling_PC is
   protected Buf is
      procedure Write(Item : in Integer);
      function Read return Integer;
      procedure Set_Done;
      function Get_Done return Boolean;
   private
      Value : Integer := Integer'First;
      Is_Done : Boolean := False;
   end Buf;

   protected body Buf is
      procedure Write(Item : in Integer) is
      begin
         Value := Item;
      end Write;
      function Read return Integer is
      begin
         return Value;
      end Read;
      procedure Set_Done is
      begin
         Is_Done := True;
      end Set_Done;
      function Get_Done return Boolean is
      begin
         return Is_Done;
      end Get_Done;
   end Buf;

   task Consumer;

   task body Consumer is
   begin
      while not Buf.Get_Done loop
         Put_Line("Consumer read" & Integer'Image(Buf.Read));
      end loop;
   end Consumer;

begin
   for I in 1..10 loop
      Put_Line("Producer writing" & Integer'Image(I));
      Buf.Write(I);
   end loop;
   Buf.Set_Done;
end Sampling_PC;

A few words of explanation might be needed for those unfamiliar with Ada tasking. In the example above Buf is a protected object. In Ada terms a protected object is an object used as a shared buffer between tasks (similar to threads). Each buffer implements methods to access its internal data elements. The kinds of methods are procedures, entries, and functions. A procedure has unconditional read/write access to the protected object. Each procedure automatically manipulates a read/write lock on the protected object. An entry is very much like a procedure except that it adds a controlling condition, much like a condition variable in threads. An entry not only implements a read/write lock to prevent multiple simultaneous writers and overlapping read/write operations, it also implements a queue for tasks waiting for the condition to become TRUE. A function for a protected object provides read-only access to the protected object. Functions automatically manipulate shared read locks so that multiple tasks may simultaneously read from the protected object. Simultaneous reads cannot corrupt the protected object.

The example above uses only procedures and functions. The Consumer task reads the protected object through the use of functions while the producer, which is the program main task in this case writes to the protected object using the procedures.

An example of a producer-consumer pattern using multiple producers and multiple consumers is:

------------------------------------------------------------------
-- Multiple producers and consumers sharing the same buffer --
------------------------------------------------------------------
with Ada.Text_IO; use Ada.Text_Io;

procedure N_Prod_Con is
   protected Buffer is
      Entry Write(Item : in Integer);
      Entry Read(Item : Out Integer);
   private
      Value  : Integer := Integer'Last;
      Is_New : Boolean := False;
   end Buffer;
   protected body Buffer is
      Entry Write(Item : in Integer) when not Is_New is
      begin
         Value := Item;
         Is_New := True;
      end Write;
      Entry Read(Item : out Integer) when Is_New is
      begin
         Item := Value;
         Is_New := False;
      end Read;
   end Buffer;

   task type Producers(Id : Positive) is
      Entry Stop;
   end Producers;
   task body Producers is
      Num : Positive := 1;
   begin
      loop
         select
            accept Stop;
            exit;
         or
            delay 0.0001;
         end select;
         Put_Line("Producer" & Integer'Image(Id) & " writing" & Integer'Image(Num));
         Buffer.Write(Num);
         Num := Num + 1;
      end loop;
   end Producers;

   task type Consumers(Id : Positive) is
      Entry Stop;
   end Consumers;

   task body Consumers is
      Num : Integer;
   begin
      loop
         select
            accept stop;
            exit;
         or
            delay 0.0001;
         end select;
         Buffer.Read(Num);
         Put_Line("Consumer" & Integer'Image(ID) & " read" & Integer'Image(Num));
      end loop;
   end Consumers;
   P1 : Producers(1);
   P2 : Producers(2);
   P3 : Producers(3);
   C1 : Consumers(1);
   C2 : Consumers(2);
   C3 : Consumers(3);
begin
   delay 0.2;
   P1.Stop;
   P2.Stop;
   P3.Stop;
   C1.Stop;
   C2.Stop;
   C3.Stop;
end N_Prod_Con;
1
The producer-consumer pattern is also about asynchronous consumption of data messages. The case where there is a drop in producer activity or raise in consumer activity is the best case of this pattern because in this case if there is no data in the queue then consumer threads will time-wait for the producers to produce data elements. The worst case of the pattern is when the producer is having a spike. Though pool of threads can be used, there is a limit to the number of threads in the pool.S R Chaitanya
Since application design is always done to handle worst case scenarios, which in producer-consumer pattern is spike in producer activity or drop in consumer activity, the queue is the best choice.S R Chaitanya
If application design is always done to handle worst case scenarios then bounded queues will never be used because the worst case fills the bounded queue. On the other hand, dynamically allocated queues always fail in the worst case when memory is exhausted. Eventually one may discover the use of files offers more flexibility than the use of queues.Jim Rogers
The argument for use of a queue is based upon a claim of performance, but performance is not improved by the size of the queue, but rather the number of rate limiting threads that can be applied to the problem. If the data production spikes occur a queue will not increase the speed of the consumers. The consumers will operate at whatever their rate is whether the queue contains many elements or one element.Jim Rogers
The reason a bounded queue needs to be used is to make sure that application does not run out of memory. The number of messages that can be put in a bounded queue is pre-determined and is dependent on things like number of concurrent users, number of consumers, type of messages that need to be consumed etc. I would like to understand more about your idea on single element buffer.S R Chaitanya

1 Answers

2
votes

In every producer-consumer implementation either the producer will produce data faster than the consumer can process the data or the producer will produce data slower than the consumer can process the data. This means that a queue will eventually either fill to its maximum capacity or it will contain only one data element before that data element is consumed by the consumer.

This assumes that the producers produce at a constant rate, and the consumers consume at a constant rate. It also assumes that the number of consumers is fixed, which isn't necessarily the case with, for example, thread pool implementations that can vary the number of threads in response to load.

Once we relax all these assumptions, having a queue can make sense. Among other things, it allows the system to gracefully deal with temporary spikes in demand.

This is not to say that a queue of capacity one (either blocking or overwriting) doesn't have its uses. In some circumstances it is precisely what's required.