2
votes

I've got the following code:

    while(!currentBoard.boardIsValid()){
        for (QueueLocation location : QueueLocation.values()){
            while(!inbox.isEmpty(location)){
                Cell c = inbox.dequeue(location);
                notifyNeighbours(c.x, c.y, c.getCurrentState(),previousBoard);
            }
        }
    }

I've got a consumer with a few queues (all of their methods are synchronised). One queue for each producer. The consumer loops over all the queues and checks if they've got a task for him to consume. If the queue he's checking has a task in it, he consumes it. Otherwise, he goes to the check the next queue until he finishes iterating over all the queues.

As of now, if he iterates over all the queues and they're all empty, he keeps on looping rather than waiting for one of them to contain something (as seen by the outer while).

How can I make the consumer wait until one of the queues has something in it?

I'm having an issue with the following scenario: Lets say there are only 2 queues. The consumer checked the first one and it was empty. Just as he's checking the second one (which is also empty), the producer put something in the first queue. As far as the consumer is concerned, the queues are both empty and so he should wait (even though one of them isn't empty anymore and he should continue looping).

Edit: One last thing. This is an exercise for me. I'm trying to implement the synchronisation myself. So if any of the java libraries have a solution that implements this I'm not interested in it. I'm trying to understand how I can implement this.

3
Unless you can modify the producers to do an additional signal/notify there are two types of solution, 1) use a timeout and find a trade-off between cpu consumption and reponsiveness or 2) use additional threads waiting on the queues and notifying the consumer. The additional threads of the second approach will consume more memory but no significant cpu time as they are mostly waiting.Holger

3 Answers

1
votes

@Abe was close. I would use signal and wait - use the Object class built-ins as they are the lightest weight.

Object sync = new Object();  // Can use an existing object if there's an appropriate one

// On submit to queue
synchronized ( sync ) {
    queue.add(...);  // Must be inside to avoid a race condition
    sync.notifyAll();
}

// On check for work in queue
synchronized ( sync ) {
    item = null;
    while ( item == null ) {
        // Need to check all of the queues - if there will be a large number, this will be slow,
        // and slow critical sections (synchronized blocks) are very bad for performance
        item = getNextQueueItem();
        if ( item == null ) {
            sync.wait();
        }
    }
}

Note that sync.wait releases the lock on sync until the notify - and the lock on sync is required to successfully call the wait method (it's a reminder to the programmer that some type of critical section is really needed for this to work reliably).

By the way, I would recommend a queue dedicated to the consumer (or group of consumers) rather than a queue dedicated to the producer, if feasible. It will simplify the solution.

0
votes

If you want to block across multiple queues, then one option is to use java's Lock and Condition objects and then use the signal method.

So whenever the producer has data, it should invoke the signallAll.

Lock fileLock = new ReentrantLock();
Condition condition = fileLock.newCondition();
...
// producer has to signal
condition.signalAll();
...
// consumer has to await.
condition.await();

This way only when the signal is provided will the consumer go and check the queues.

0
votes

I solved a similar situation along the lines of what @Abe suggests, but settled on using a Semaphore in combination with an AtomicBoolean and called it a BinarySemaphore. It does require the producers to be modified so that they signal when there is something to do.
Below the code for the BinarySemaphore and a general idea of what the consumer work-loop should look like:

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class MultipleProdOneConsumer {

BinarySemaphore workAvailable = new BinarySemaphore();

class Consumer {

    volatile boolean stop;

    void loop() {

        while (!stop) {
            doWork();
            if (!workAvailable.tryAcquire()) {
                // waiting for work
                try {
                    workAvailable.acquire();
                } catch (InterruptedException e) {
                    if (!stop) {
                        // log error
                    }
                }
            }
        }
    }

    void doWork() {}

    void stopWork() {
        stop = true;
        workAvailable.release();
    }
}

class Producer {

    /* Must be called after work is added to the queue/made available. */
    void signalSomethingToDo() {
        workAvailable.release();
    }
}

class BinarySemaphore {

    private final AtomicBoolean havePermit = new AtomicBoolean();
    private final Semaphore sync;

    public BinarySemaphore() {
        this(false);
    }

    public BinarySemaphore(boolean fair) {
        sync = new Semaphore(0, fair);
    }

    public boolean release() {

        boolean released = havePermit.compareAndSet(false, true);
        if (released) {
            sync.release();
        }
        return released;
    }

    public boolean tryAcquire() {

        boolean acquired = sync.tryAcquire();
        if (acquired) {
            havePermit.set(false);
        }
        return acquired;
    }

    public boolean tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException {

        boolean acquired = sync.tryAcquire(timeout, tunit);
        if (acquired) {
            havePermit.set(false);
        }
        return acquired;
    }

    public void acquire() throws InterruptedException {

        sync.acquire();
        havePermit.set(false);
    }

    public void acquireUninterruptibly() {

        sync.acquireUninterruptibly();
        havePermit.set(false);
    }

}

}