1
votes

I was doing a research in producers and consumer design patterns with regards to threads in java, I recently explored in java 5 with the introduction With introduction of BlockingQueue Data Structure in Java 5 Its now much simpler because BlockingQueue provides this control implicitly by introducing blocking methods put() and take(). Now you don't require to use wait and notify to communicate between Producer and Consumer. BlockingQueue put() method will block if Queue is full in case of Bounded Queue and take() will block if Queue is empty. In next section we will see a code example of Producer Consumer design pattern. I have developed the below program but please also let me know the old style approach of waut() and notify() , I want to develop the same logic with old style approach also

Folks please advise how this can be implemented in , classical way is using wait() and notify() method to communicate between Producer and Consumer thread and blocking each of them on individual condition like full queue and empty queue...?

    import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumerPattern {

    public static void main(String args[]){

     //Creating shared object
     BlockingQueue sharedQueue = new LinkedBlockingQueue();

     //Creating Producer and Consumer Thread
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));

     //Starting producer and Consumer thread
     prodThread.start();
     consThread.start();
    }

}

//Producer Class in java
class Producer implements Runnable {

    private final BlockingQueue sharedQueue;

    public Producer(BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=0; i<10; i++){
            try {
                System.out.println("Produced: " + i);
                sharedQueue.put(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}

//Consumer Class in Java
class Consumer implements Runnable{

    private final BlockingQueue sharedQueue;

    public Consumer (BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        while(true){
            try {
                System.out.println("Consumed: "+ sharedQueue.take());
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }


}

Output:
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Consumed: 6
Produced: 8
Consumed: 7
Produced: 9
Consumed: 8
Consumed: 9
3
What is the question? Is it part of homework?amit
BlockingQueue was introduced into Java 5.0, 8 years ago but it was a separate library much longer than that. I don't imagine people using wait/notify to do what you suggest in fact I wouldn't use a BlockingQueue at all, you should be using an ExecutorService.Peter Lawrey
@PeterLawrey,is there any advantage using ExecutorService instead of BlokingQueue?UVM
@PeterLawrey that's great man, thanks for advise could you please post how to achieve this same thing with execotor service also . that will make understanding more clear..!1Crazy4Java
An ExecutorService wraps a Queue and a thread pool (which can be single, fixed or cached). When you submit tasks to it, you can obtain a Future which will give you the result or exception when its available, you can shutdown the whole thing with one method call. It also support scheduled and periodic recurring eventsPeter Lawrey

3 Answers

5
votes

If you want to know another way to do this try using an ExecutorService

public static void main(String... args) {
    ExecutorService service = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 100; i++) {
        System.out.println("Produced: " + i);

        final int finalI = i;
        service.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("Consumed: " + finalI);
            }
        });
    }
    service.shutdown();
}

With just 10 tasks the producer can be finished before the consumer starts. If you try 100 tasks you may find them interleaved.

2
votes

If you want to understand how a BlockingQueue works, for educational purposes, you can always have a look on its source code.

The simplest way could be to synchronize the offer() and take() methods, and once the queue is full and someone is trying to offer() an element - invoke wait(). When someone is taking an element, notify() the sleeping thread. (Same idea when trying to take() from an empty queue).
Remember to make sure all your wait() calls are nested in loops that checks if the conditions are met each time the thread is awakened.

If you are planning to implement it from scratch for product purposes - I'd strongly argue against it. You should use an existing, tested libraries and components as much as possible.

1
votes

I can do this wait-notify stuff in my sleep (or at least I think I can). Java 1.4 source provided beautiful examples of all this, but they've switched to doing everything with atomics and it's a lot more complicated now. The wait-notify does provide flexibility and power, though the other methods can shield you from the dangers of concurrency and make for simpler code.

To do this, you want some fields, like so:

private final ConcurrentLinkedQueue<Intger>  sharedQueue =
                                                    new ConcurrentLinkedQueue<>();
private volatile   boolean  waitFlag = true;

Your Producer.run would look like this:

public void run()  {
    for (int i = 0; i < 100000, i++)  {
        System.out.println( "Produced: " + i );
        sharedQueue.add( new Integer( i ) );
        if (waitFlag)       // volatile access is cheaper than synch.
            synchronized (sharedQueue)  { sharedQueue.notifyAll(); }
    }
}

And Consumer.run:

public void run()  {
    waitFlag = false;
    for (;;)  {
        Integer  ic = sharedQueue.poll();
        if (ic == null)  {
            synchronized (sharedQueue)  {
                waitFlag = true;
                // An add might have come through before waitFlag was set.
                ic = sharedQueue.poll();
                if (ic == null)  {
                    try  { sharedQueue.wait(); }
                    catch (InterruptedException ex)  {}
                    waitFlag = false;
                    continue;
                }
                waitFlag = true;
            }
        }
        System.out.println( "Consumed: " + ic );
    }
}

This keeps synchronizing to a minimum. If all goes well, there's only one look at a volatile field per add. You should be able to run any number of producers simultaneously. (Consumer's would be trickier--you'd have to give up waitFlag.) You could use a different object for wait/notifyAll.