2
votes
  • The producer is finite, as should be the consumer.
    • The problem is when to stop, not how to run.
  • Communication can happen over any type of BlockingQueue.
    • Can't rely on poisoning the queue(PriorityBlockingQueue)
    • Can't rely on locking the queue(SynchronousQueue)
    • Can't rely on offer/poll exclusively(SynchronousQueue)
    • Probably even more exotic queues in existence.

Creates a queued seq on another (presumably lazy) seq s. The queued seq will produce a concrete seq in the background, and can get up to n items ahead of the consumer. n-or-q can be an integer n buffer size, or an instance of java.util.concurrent BlockingQueue. Note that reading from a seque can block if the reader gets ahead of the producer.

http://clojure.github.com/clojure/clojure.core-api.html#clojure.core/seque

My attempts so far + some tests: https://gist.github.com/934781

Solutions in Java or Clojure appreciated.

2
In Java, I would just use an ExecutorService or a class which wraps it to handle all the event types you need as it can do all the events you mention and much more. - Peter Lawrey

2 Answers

0
votes
class Reader {

    private final ExecutorService ex = Executors.newSingleThreadExecutor();
    private final List<Object> completed = new ArrayList<Object>();
    private final BlockingQueue<Object> doneQueue = new LinkedBlockingQueue<Object>();
    private int pending = 0;

    public synchronized Object take() {
        removeDone();
        queue();
        Object rVal;
        if(completed.isEmpty()) {
            try {
                rVal = doneQueue.take();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            pending--;
        } else {
            rVal = completed.remove(0);
        }
        queue();
        return rVal;
    }

    private void removeDone() {
        Object current = doneQueue.poll();
        while(current != null) {
            completed.add(current);
            pending--;
            current = doneQueue.poll();
        }
    }

    private void queue() {
        while(pending < 10) {
            pending++;
            ex.submit(new Runnable() {

                @Override
                public void run() {
                    doneQueue.add(compute());
                }

                private Object compute() {
                    //do actual computation here
                    return new Object();
                }
            });
        }
    }
}
0
votes

Not exactly an answer I'm afraid, but a few remarks and more questions. My first answer would be: use clojure.core/seque. The producer needs to communicate end-of-seq somehow for the consumer to know when to stop, and I assume the number of produced elements is not known in advance. Why can't you use an EOS marker (if that's what you mean by queue poisoning)?

If I understand your alternative seque implementation correctly, it will break when elements are taken off the queue outside your function, since channel and q will be out of step in that case: channel will hold more #(.take q) elements than there are elements in q, causing it to block. There might be ways to ensure channel and q are always in step, but that would probably require implementing your own Queue class, and it adds so much complexity that I doubt it's worth it.

Also, your implementation doesn't distinguish between normal EOS and abnormal queue termination due to thread interruption - depending on what you're using it for you might want to know which is which. Personally I don't like using exceptions in this way — use exceptions for exceptional situations, not for normal flow control.