7
votes

If I have a situation like the following:

ObserverA, ObserverB, ObserverC all inherit from AbstractObserver.

I create a list of observers:

List<AbstractObserver> list = new ArrayList<AbstractObserver>();
list.add(new ObserverA());
list.add(new ObserverB());
list.add(new ObserverC());

And some kind of handler with the following methods runs in a "MAIN" thread:

public void eat(Food item) {
     for(AbstractObserver o : list) {
          o.eatFood(item);
     }
}

public void drink(Coffee cup) {
     for(AbstractObserver o : list) {
          o.drinkCoffee(cup);
     }
}

How would I design a system where I could run each eatFood and drinkCoffee method of the observers in different threads? Specifically, how would I run the eatFood or drinkCoffee method in ObserverA, ObserverB, and ObserverC in their own threads when the "MAIN" thread receives an event (drink or eat methods get called)?

I'd like to have different threads for each AbstractObserver subclass instance because, currently, I'm notifying each observer sequentially which could cause delays.

3
I don't understand how the objects being observers plays a role here. What are the subjects they observe and how do they relate to what the eatFood and drinkCoffe method do?Philipp
No, not for me. From my perspective, I don't know when the eat and drink (the MAIN thread methods) will be called. They could be called five minutes apart or more. I just want each individual AbstractObserver subclass instance to be able to run drinkCoffee or eatFood concurrently when the MAIN thread's eat or drink methods get called.sneeze1
Do you have any restriction on concurrency? For example, can subsequent o.eatFood() on the same object o be invoked before previous o.eatFood() ended?Alexei Kaigorodov

3 Answers

4
votes

Making some simplifying assumptions here that you don't care about getting notified when the eating/drinking finishes, you could also use the executor framework to throw the work onto a queue:

  // declare the work queue
   private final Executor workQueue = Executors.newCachedThreadPool(); 




   // when you want to eat, schedule a bunch of 'eating' jobs
       public void eat(final Food item){
          for (final AbstractObserver o: list) {
             workQueue.execute(new Runnable() {

                @Override
                public void run() {
                   o.eatFood(item); // runs in background thread
                }
             });
          }
       }

On exit to your program, you must shut down the executor:

   workQueue.shutdown();
2
votes

I'm not a pro at this, but perhaps you could use a Producer-consumer set up. Here the producer, which is the observed entity, could add a notification on a queue in its own thread, which the consumer, the observer here, would get from the same queue, but on its own thread.

2
votes

To elaborate on Hovercraft's answer, a basic implementation of your observer could look like this:

class ObserverA implements Runnable {
    private final BlockingQueue<Food> queue = new ArrayBlockingQueue<> ();

    public void eatFood(Food f) {
        queue.add(f);
    }

    public void run() {
        try {
            while (true) {
                Food f = queue.take(); //blocks until some food is on the queue
                //do what you have to do with that food
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            //and exit
        }
    }
}

So your code that calls eatFood would return immediately from that method and not block your main thread.

You obviously need to allocate a thread to the observer, either directly: new Thread(observerA).start(); or through an ExecutorService, which is probably easier and preferable.


Alternatively, you can create the threads at the "observed" object level:

private static final ExecutorService fireObservers = Executors.newFixedThreadPool(10);

public void eat(Food food) {
    for (AbstractObserver o : observers) {
        //(i)  if an observer gets stuck, the others can still make progress
        //(ii) if an observer throws an exception, a new thread will be created
        Future<?> f = fireObservers.submit(() -> o.dataChanged(food));
        fireObservers.submit(new Callable<Void>() {
            @Override public Void call() throws Exception {
                try {
                    f.get(1, TimeUnit.SECONDS);
                } catch (TimeoutException e) {
                    logger.warn("Slow observer {} has not processed food {} in one second", o, food);
                } catch (ExecutionException e) {
                    logger.error("Observer " + o + " has thrown exception on food " + food, e.getCause());
                }
                return null;
            }
        });
    }
}

(I mostly copied pasted from here - you probably need to adapt it to your needs).