2
votes

I'm trying out Multiple Producer - Multiple Consumer use case of Producer-Consumer problem. I'm using BlockingQueue for sharing common queue between multiple producers/consumers.

Below is my code.
Producer

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private BlockingQueue inputQueue;
    private static volatile int i = 0;
    private volatile boolean isRunning = true;

    public Producer(BlockingQueue q){
        this.inputQueue=q;
    }

    public synchronized void run() {

        //produce messages
        for(i=0; i<10; i++) 
        {
            try {
                inputQueue.put(new Integer(i));

                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Produced "+i);
        }
        finish();
    }

    public void finish() {
        //you can also clear here if you wanted
        isRunning = false;
    }

}

Consumer

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private BlockingQueue inputQueue;
    private volatile boolean isRunning = true;

    private final Integer POISON_PILL = new Integer(-1);

    Consumer(BlockingQueue queue) {
        this.inputQueue = queue;
    }

    public void run() {
        //worker loop keeps taking en element from the queue as long as the producer is still running or as 
        //long as the queue is not empty:
        while(!inputQueue.isEmpty()) {

            try {
                Integer queueElement = (Integer) inputQueue.take();
                System.out.println("Consumed : " + queueElement.toString());

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("Queue ");
    }

    //this is used to signal from the main thread that he producer has finished adding stuff to the queue
    public void finish() {
        //you can also clear here if you wanted
        isRunning = false;
        inputQueue.add(POISON_PILL);
    }
}

Test Class

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class ProducerConsumerService {

    public static void main(String[] args) {

        //Creating BlockingQueue of size 10
        BlockingQueue queue = new ArrayBlockingQueue(10);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        //starting producer to produce messages in queue
        new Thread(producer).start();

        //starting producer to produce messages in queue
        new Thread(producer).start();

        //starting consumer to consume messages from queue
        new Thread(consumer).start();

        //starting consumer to consume messages from queue
        new Thread(consumer).start();

        System.out.println("Producer and Consumer has been started");
    }

}

I don't see the correct output when I ran the below code.

Is there any mistake that I'm doing here ?

3

3 Answers

11
votes

There is quite a bit of your code which doesn't make sense. I suggest you sit down and work out why the code is there and what it is doing.

If you deleted the isFinshed flag, nothing would change.

If you deleted the use of synchronized in the producer you would have concurrent producers. There is no benefit in making a field which is only accessed in a synchronzied block volatile.

It makes no sense for producers to share a loop counter, if they are to be concurrent. Normally, a producer sends a poison pill, and a consumer doesn't consumer the pill. e.g. if you have two consumers, one might add the pill and the other might consume it. Your consumer ignores poison pills, as it ignores the isFinished flag.

You don't want to stop the consumer just because the queue is temporarily empty. Otherwise it will not see all the message the producer produces, possibly none of them.

7
votes

Sample code with multiple producers & multiple consumers.

import java.util.concurrent.*;

public class ProducerConsumerDemo {

    public static void main(String args[]){

     BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

     Thread prodThread1 = new Thread(new Producer(sharedQueue,1));
     Thread prodThread2 = new Thread(new Producer(sharedQueue,2));
     Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
     Thread consThread2 = new Thread(new Consumer(sharedQueue,2));

     prodThread1.start();
     prodThread2.start();
     consThread1.start();
     consThread2.start();
    }

}

class Producer implements Runnable {

    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;

    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }

}

class Consumer implements Runnable{

    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }

    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

Output:

Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Produced:23:by thread:2
Produced:24:by thread:2
Produced:25:by thread:2
Consumed: 11:by thread:1
Consumed: 22:by thread:1
Consumed: 23:by thread:1
Consumed: 24:by thread:1
Consumed: 25:by thread:1
Produced:12:by thread:1
Consumed: 21:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Produced:14:by thread:1
Produced:15:by thread:1
Consumed: 13:by thread:2
Consumed: 14:by thread:1
Consumed: 15:by thread:2

This article provides simple example producer and consumer problem with BlockingQueue

Changes to your code:

  1. Different producers will generate different output instead of same output. Producer Thread 1 generates numbers from 11-15 and Producer Thread 2 generates numbers from 21-25
  2. Any of Consumer thread can consume data from any of Producers. Unlike Producers, Consumers don't have constraints to consume the data.
  3. I have added Thread number in both Producer and Consumer.

You can find alternative solution with ExecutorService at :

Producer/Consumer threads using a Queue

0
votes

It’s not too hard when just implementing it straight forward. The example code below does it. It simply uses local variables for everything that is not supposed to be shared.

Besides the queue, only a thread safe counter maintaining the number of active producers is shared. The counter is used rather than a special “POISON_PILL” value as such a marker value does not work with a single queue and multiple consumers as all consumers have to recognize the finishing of the producer but only when all producers have finished.

The counter is a simple ending condition. The only thing to care about is that after detecting that the counter reached zero, the queue has to be re-checked to avoid race conditions.

As a side note, it doesn’t make sense to use concurrency features provided by Java 5 and not using Generics for clean type safe code.

final AtomicInteger activeProducers=new AtomicInteger();
final BlockingQueue<Integer> queue=new ArrayBlockingQueue<>(10);
Runnable producer=new Runnable() {
  public void run() {
    try {
      for(int i=0; i<10; i++) {
          Thread.sleep(TimeUnit.SECONDS.toMillis(1));
          queue.put(i);
          System.out.println("Produced "+i);
      }
    } catch(InterruptedException ex) {
      System.err.println("producer terminates early: "+ex);
    }
    finally { activeProducers.decrementAndGet(); }
  }
};
Runnable consumer=new Runnable() {
  public void run() {
    try {
      for(;;) {
        Integer queueElement = queue.poll(1, TimeUnit.SECONDS);
        if(queueElement!=null)
          System.out.println("Consumed : " + queueElement);
        else if(activeProducers.get()==0 && queue.peek()==null) return;
      }
    } catch(InterruptedException ex) {
      System.err.println("consumer terminates early: "+ex);
    }
  }
};
final int NUM_PRODUCERS = 2, NUM_CONSUMERS = 2;
for(int i=0; i<NUM_PRODUCERS; i++) {
  activeProducers.incrementAndGet();
  new Thread(producer).start();
}
for(int i=0; i<NUM_CONSUMERS; i++) {
  new Thread(consumer).start();
}