0
votes

I'm trying to solve the producer consumer problem with threads in java, but the code won't run in parallell/concurrently. The producer always fills up the buffer completely before the consumer starts to consume, and I don't get why. The point is trying to do it using only synchronized blocks, wait() and notify().

Main :

    String [] data = {"Fisk", "Katt", "Hund", "Sau", "Fugl", "Elg", "Tiger", 
               "Kameleon", "Isbjørn", "Puma"};
    ProducerConsumer pc = new ProducerConsumer(5);
    Thread[] thrds = new Thread[2];
    thrds[0] = new Thread(new MyThread1(pc, data)); // producer
    thrds[1] = new Thread(new MyThread2(pc)); // consumer
    thrds[0].start();
    thrds[1].start();
    for(int i = 0; i < 2; i++) { // wait for all threads to die
        try { 
            thrds[i].join(); 
        } 
        catch (InterruptedException ie) {}
    }
    System.exit(0);

ProducerConsumer.java:

import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumer implements Runnable {
    private int bufferSize;
    private Queue<String> buffer;

public ProducerConsumer(int size) {

    bufferSize = size;
    buffer = new LinkedList<String>();
}

public void produce(String item) throws InterruptedException {
        synchronized(buffer) {
            while (buffer.size() >= bufferSize) {
                try {
                    System.out.println("Full buffer. Waiting for consumer...");
                    buffer.wait();
                }catch (Exception e) {}
            }
            buffer.add(item);
            System.out.println("Producer is putting " + item + " in the buffer");
            buffer.notify();
        }   
}

public void consume() throws InterruptedException {
    synchronized (buffer) {
        while (buffer.size() == 0) {
            try {
                System.out.println("Empty buffer. Waiting for production...");
                buffer.wait();
            }catch (Exception e) {}
        }
        System.out.println("Consumer is consuming " +  buffer.remove() + ".");
        buffer.notify();
    }
}

@Override
public void run() {
}

}

MyThread1 :

/*
 * PRODUCER - Thread
 */
public class MyThread1 implements Runnable {

private String [] data;
private ProducerConsumer pc;

public MyThread1(ProducerConsumer pc, String [] data) {
    this.pc = pc;
    this.data = data;
}
@Override
public void run() {
    for (int i = 0; i < data.length; i++) {
        try {
            pc.produce(data[i]);
        } catch (InterruptedException ex) {}
    }
}

}

MyThread2:

//THE CONSUMER - Thread

public class MyThread2 implements Runnable{

private ProducerConsumer pc;

public MyThread2(ProducerConsumer pc) {
    this.pc = pc;
}

//Run consume
@Override
public void run() {
    while (true) {
        try {
            pc.consume();
            Thread.sleep(2);
        }
        catch(InterruptedException e) {}

    }

}
}
2
I think because of synchronized(buffer)Naman Gala
A) Is that some kind of assignment or do you have other reasons not to use a BlockingQueue? B) there is no code to prevent that the producer is being as fast as it is. If you want that to happen, maybe let it sleep or so? Threads have no specified timing behavior & the first thread started may complete before you even start the second.zapl
And why does ProducerConsumer implements Runnable?Naman Gala
Yes, it's an assignment. I forgot to mention in the question that I am only to use synchronized blocks, wait() and notify(). I edited it in now.user1784297
@NamanGala : That was a last effort on my part to see if it would change anything.user1784297

2 Answers

0
votes

On recent machines, with short queues like this, you will never see actual multithreading effects like, in this case, producer and consumer taking turns unless you slow both of them down a bit. You only slowed down the consumer. Instead of using a short array, put a million Integers in a queue and see what happens.

0
votes
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class ProduserConsumerDemo {

    public static void main(String[] args) {
        List<Integer> list = new CopyOnWriteArrayList<>();
        int size = 5;
        Producer producer = new Producer(list, size);
        Consumer consumer = new Consumer(list);
        Thread t1 = new Thread(producer, "Producer");
        Thread t2 = new Thread(consumer, "Consumer");
        t1.start();
        t2.start();
    }
}

class Producer implements Runnable {
    private final List<Integer> list;
    private final int size;

    public Producer(List<Integer> list, final int size) {
        this.list = list;
        this.size = size;
    }

    public void run() {
        try {
            produce();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void produce() throws InterruptedException {
        int i = 0;
        while (i >= 0) {
            synchronized (list) {
                while (list.size() == size) {
                    System.out.println(
                            "List is full." + Thread.currentThread().getName() + " is waiting. Size:" + list.size());
                    list.wait();
                }
                System.out.println("Produce :" + i);
                list.add(i++);
                Thread.sleep(50);
                list.notify();
            }
        }
    }
}

class Consumer implements Runnable {
    private final List<Integer> list;

    public Consumer(List<Integer> list) {
        this.list = list;
    }

    public void run() {
        try {
            consume();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void consume() throws InterruptedException {
        while (true) {
            synchronized (list) {
                while (list.isEmpty()) {
                    System.out.println(
                            "List is empty. " + Thread.currentThread().getName() + " is waiting. Size:" + list.size());
                    list.wait();
                }
                System.out.println("Consumed item:" + list.remove(0));
                Thread.sleep(50);
                list.notify();
            }
        }
    }
}