1
votes

In producer-consumer model (one thread producing items, putting it in a blocking queue and another thread consuming them in an endless loop) it's recommended to shutdown the threads by sending a poison object which causes the loop to break.

Afaik, this can be done by using:

  • null (discouraged in general and forbidden by BlockingQueue)
  • a value which won't ever be created by the producer (requires extra testing and maybe vaporizing some brain cells)
  • a wrapper type and put every produced value in a class which contains the value as only property and is set to null in order to indicate shutdown (e.g. Optional of the Guava framework) (requires expensive allocation of an object for every produced item)
  • an extra property added to the class which is used as item type (makes the model quite unflexible)
  • a subclass of the type created by producer which is only used for the poison object (doesn't work with final classes, requires an extra class which can be private and doesn't consume noticable space, but is a drawback)
  • interrupting the thread doesn't allow to clear the queue.

I'm looking for a type-safe solution for arbitrary queue items.

2
Why not use one single specific static object which you can check for reference-equality? Like in your Producer you have public static object POISON = new object(); and in your consumer you check while (objFromQueue != Producer.POISON) { do your stuff }...Fildor
Why won't you interrupt producer and consumer?SpaceTrucker
The easiest one for you to understand. You're shutting the queue down. What difference will a few nanoseconds make if you do find some "most efficient" means?Andrew Henle
You can also use an instance of your Queue Item class. The point is that its value is irrelevant for your Poison-Check. OR you could have in your Item class a boolean field "isPoison" that will be false by default and set to true for the poison-instance. Or are you using primitives as Queue Items? Like Integer or so?Fildor
Interruption doesn’t prevent clearing the queue (per se). Interruption only has an effect if the thread is in a waiting state (ending the wait) or does actively query the interruption state. As long as the queue isn’t empty, retrieving an item won’t put the consumer thread into wait state, so interruption doesn’t prevent clearing the queue. Only if the consumer does other wait operations or actively queries the interruption state, it’s up to the consumer to drain the queue after detecting an interruption, before terminating.Holger

2 Answers

0
votes

OK, here's what I would do:

I would define an interface that all messages would need to implement:

public interface Message {
    public default boolean continueProcessing() {
        return true;
    }
}

Then a Poison enum class:

public enum Poison implements Message {
    INSTANCE;

    @Override
    public boolean continueProcessing() {
        return false;
    }
}

The consumer code would look like this:

@Override
public void run() {
    while (true) {
        Message msg = queue.take();
        if (!msg.continueProcessing()) {
            break;
        }
        doSomethingWith(msg);
    }
}
0
votes

You can use a Poison Pill. The value does not matter:

Producer:

class Producer{
    public static final ItemType POISON = new ItemType();

    // rest goes here
}

Consumer:

class Consumer{
    void run(){
         for(;;){
              ItemType item = queue.take();
              if( item == Producer.POISON ) // <- not checking value of item here!
                  break;
              handleItem( item );
         }
    }
}

This also works with types like Integer that may be cached if you make sure your POISON is not cached.

Without knowing what your ItemType actually is, this is the best (IMHO) I can suggest to you.


Another idea that just came to my mind would be to introduce a field on the Producer that indicates that no more Items are to be expected after the queue has been emptied. This would imply an additional check on empty queue in the consumer code in some way. It would also mean that you must not block on empty queue ... so a lot to consider when implementing this.