I see many descriptions and examples of the producer-consumer pattern using queues as the shared data buffer. What is the reason for using a queue rather than a single element data buffer?
In every producer-consumer implementation either the producer will produce data faster than the consumer can process the data or the producer will produce data slower than the consumer can process the data. This means that a queue will eventually either fill to its maximum capacity or it will contain only one data element before that data element is consumed by the consumer. Once the queue is full the producer will be required to wait for the consumer before it can add another data element. When the queue contains only one element the consumer will be required to wait for the producer before it can consume another element. In other words, the queue will behave like a single element buffer.
Queue manipulation requires far more processing overhead, and program complexity, than dealing with a single data element. There should be an advantage of some sort to justify the added complexity and overhead.
There are two broad sub-patterns of the producer-consumer pattern. The first is a synchronized producer-consumer pattern. This pattern assures that the consumer consumes each data element produced by the consumer, and consumes it exactly once. A synchronized pattern is discussed above. The second is a sampling consumer.
The sampling consumer consumes only a sample of the data elements produced by the producer. If the consumer consumes data elements faster than the producer produces data elements the consumer is said to over-sample the buffer. If the consumer consumes data elements slower than the producer produces data elements the consumer is said to under-sample the data elements. An example of an under-sampling design would be a weather station which generates temperature readings at a rate of 100Hz (100 times per second) while producing a report at a rate of 1 Hz (1 time per second). The consumer need not read 100 data points each second and produce an average of those readings. Instead it can take 1 reading per second and report that reading. If the consumer samples only 1% of the readings there is no need to provide a queue of data elements in the buffer.
Bounded queues are often used to control memory consumption. Unbounded queues always come with the possibility of running out of heap memory, which can result in application failure.
A bounded multi-element queue may seem like a good idea when there are many producers. If all the data produced by the many producers needs to be processed in exactly the same manner then it may be reasonable to write all that data to a common buffer so that one or more consumers can process the data. There are a few costs associated with a multi-element queue. A multi-element queue uses memory at a rate approximately equal to the size of each data element multiplied by the number of elements in the bounded queue. A multi-element queue requires more complex logic than a single element queue. Complexity is always the enemy of correctness.
When a multi-element queue is used the consumer always reads the data element that has been in the queue the longest time. A queue is, after all, a FIFO data structure. Producers write new elements to the queue until the queue fills. At that point either the producers wait for space on the queue or they overwrite the newest element on the queue. The consumer still reads the oldest element on the queue, no matter what the producers are doing.
When the queue fills the behavior of the queue is exactly like the behavior of a single element queue. In a blocking producer/consumer pattern a full queue forces the producer to wait for a consumer to read from the queue. A single element queue is always either full or empty. A full multi-element queue simply contains many data elements waiting to be eligible to be consumed, plus a single element which is eligible to be consumed.
A single element queue simply eliminates the time in the queue for data waiting to be eligible.
On the consumer side of the pattern, the consumer can only read the oldest element on the queue, no matter how many elements are in the queue. Either an element is available to the consumer or it is not. The size of the queue is invisible to the consumer. If the queue is empty then the consumer must either suspend waiting for available data or it must actively sample the queue for available data. For a sampling queue, as mentioned above, the consumer can simply process the oldest value on the queue, and the queue never marks itself as empty.
It is very difficult to implement a sampling consumer with a multi-element queue. A single element queue will do. The producer simply overwrites whatever is in the queue and the consumer reads whatever is in the queue.
Following is an example of a sampling producer/consumer pattern written in Ada.
------------------------------------------------------------------
-- Sampling Consumer --
------------------------------------------------------------------
with Ada.Text_IO; use Ada.Text_IO;
procedure Sampling_PC is
protected Buf is
procedure Write(Item : in Integer);
function Read return Integer;
procedure Set_Done;
function Get_Done return Boolean;
private
Value : Integer := Integer'First;
Is_Done : Boolean := False;
end Buf;
protected body Buf is
procedure Write(Item : in Integer) is
begin
Value := Item;
end Write;
function Read return Integer is
begin
return Value;
end Read;
procedure Set_Done is
begin
Is_Done := True;
end Set_Done;
function Get_Done return Boolean is
begin
return Is_Done;
end Get_Done;
end Buf;
task Consumer;
task body Consumer is
begin
while not Buf.Get_Done loop
Put_Line("Consumer read" & Integer'Image(Buf.Read));
end loop;
end Consumer;
begin
for I in 1..10 loop
Put_Line("Producer writing" & Integer'Image(I));
Buf.Write(I);
end loop;
Buf.Set_Done;
end Sampling_PC;
A few words of explanation might be needed for those unfamiliar with Ada tasking. In the example above Buf is a protected object. In Ada terms a protected object is an object used as a shared buffer between tasks (similar to threads). Each buffer implements methods to access its internal data elements. The kinds of methods are procedures, entries, and functions. A procedure has unconditional read/write access to the protected object. Each procedure automatically manipulates a read/write lock on the protected object. An entry is very much like a procedure except that it adds a controlling condition, much like a condition variable in threads. An entry not only implements a read/write lock to prevent multiple simultaneous writers and overlapping read/write operations, it also implements a queue for tasks waiting for the condition to become TRUE. A function for a protected object provides read-only access to the protected object. Functions automatically manipulate shared read locks so that multiple tasks may simultaneously read from the protected object. Simultaneous reads cannot corrupt the protected object.
The example above uses only procedures and functions. The Consumer task reads the protected object through the use of functions while the producer, which is the program main task in this case writes to the protected object using the procedures.
An example of a producer-consumer pattern using multiple producers and multiple consumers is:
------------------------------------------------------------------
-- Multiple producers and consumers sharing the same buffer --
------------------------------------------------------------------
with Ada.Text_IO; use Ada.Text_Io;
procedure N_Prod_Con is
protected Buffer is
Entry Write(Item : in Integer);
Entry Read(Item : Out Integer);
private
Value : Integer := Integer'Last;
Is_New : Boolean := False;
end Buffer;
protected body Buffer is
Entry Write(Item : in Integer) when not Is_New is
begin
Value := Item;
Is_New := True;
end Write;
Entry Read(Item : out Integer) when Is_New is
begin
Item := Value;
Is_New := False;
end Read;
end Buffer;
task type Producers(Id : Positive) is
Entry Stop;
end Producers;
task body Producers is
Num : Positive := 1;
begin
loop
select
accept Stop;
exit;
or
delay 0.0001;
end select;
Put_Line("Producer" & Integer'Image(Id) & " writing" & Integer'Image(Num));
Buffer.Write(Num);
Num := Num + 1;
end loop;
end Producers;
task type Consumers(Id : Positive) is
Entry Stop;
end Consumers;
task body Consumers is
Num : Integer;
begin
loop
select
accept stop;
exit;
or
delay 0.0001;
end select;
Buffer.Read(Num);
Put_Line("Consumer" & Integer'Image(ID) & " read" & Integer'Image(Num));
end loop;
end Consumers;
P1 : Producers(1);
P2 : Producers(2);
P3 : Producers(3);
C1 : Consumers(1);
C2 : Consumers(2);
C3 : Consumers(3);
begin
delay 0.2;
P1.Stop;
P2.Stop;
P3.Stop;
C1.Stop;
C2.Stop;
C3.Stop;
end N_Prod_Con;