0
votes

According to http://en.wikipedia.org/wiki/Producer-consumer_problem I want to simulate P/C problem using semaphore. I am getting deadlock and I don't know what is problem.

public static void main(String[] args) {
        CustomBlockingQueue blockingQueue = new CustomBlockingQueue();
        new Thread(new Producer(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
    }
}

@SuppressWarnings("serial")
class CustomBlockingQueue extends LinkedList<Object> {
    private static final int MAX_SIZE = 10;

    private Semaphore mutex = new Semaphore(1);
    private Semaphore fillCount = new Semaphore(0);
    private Semaphore emptyCount = new Semaphore(MAX_SIZE);

    @Override
    public boolean offer(Object e) {
        try {
            mutex.acquire();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        boolean result = super.offer(e);
        System.out.println("offer " + size());
        try {
            fillCount.release();
            emptyCount.acquire();
            mutex.release();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        return result;
    }

    @Override
    public Object poll() {
        try {
            mutex.acquire();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        Object result = super.poll();
        System.out.println("poll  " + size());
        try {
            emptyCount.release();
            fillCount.acquire();
            mutex.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}

class Producer implements Runnable {
    private CustomBlockingQueue blockingQueue;
    private Random random = new Random();

    public Producer(CustomBlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(2));
                blockingQueue.offer(new Object());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {
    private CustomBlockingQueue blockingQueue;
    private Random random = new Random();

    public Consumer(CustomBlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(4));
                blockingQueue.poll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

Using semaphores

Semaphores solve the problem of lost wakeup calls. In the solution below we use two semaphores, fillCount and emptyCount, to solve the problem. fillCount is the number of items to be read in the buffer, and emptyCount is the number of available spaces in the buffer where items could be written. fillCount is incremented and emptyCount decremented when a new item has been put into the buffer. If the producer tries to decrement emptyCount while its value is zero, the producer is put to sleep. The next time an item is consumed, emptyCount is incremented and the producer wakes up. The consumer works analogously.

2
save yourself headaches: use thread safe queues. - jldupont
Agreed - this is all implemented for you in the libraries - no need to do it the hard way! - DNA
I want for myself to solve this problem using intentionally semaphore - ASD
BTW @ASD, I'd recommend doing your .release() calls in a finally {} block. - Gray

2 Answers

2
votes

your locking is in the wrong order:

needs to be for offer:

        emptyCount.acquire();
        mutex.acquire();
        doModification();
        mutex.release();
        fillCount.release();

similar change needed for poll:

        fillCount.acquire();
        mutex.acquire();
        doModification();
        mutex.release();
        emptyCount.release();

in your implementation you are waiting for semaphores while holding the mutex which causes problems because the other thread can be waiting for the mutex in order to release a semaphore.

2
votes

You might consider using instead a BlockingQueue which takes case of the mutex locking and waiting for you.

On the aside, I've got an old page which demonstrates the producer/consumer race conditions (as opposed to the spurious interrupt). But my implementation does not use semaphores so I'm not sure it will help you:

http://256stuff.com/gray/docs/misc/producer_consumer_race_conditions/