2
votes

How to create asynchronous messages from worker threads to the managing thread. The framework is below. In the snippet, Worker implements Runnable, and threadManager is ExecutorService. The Worker threads are long running, and should pass periodic progress messages up to the manager. One option is to use a blocking queue, but I haven't done that before.

    RunnableTester_rev2.threadManager = Executors.newFixedThreadPool( nThreadsAllowed );

    final List<Worker> my_tasks = new ArrayList<Worker>();
    for ( int i = 0; i < nThreadsToRun; i++ )
    {
        // The Worker thread is created here.
        // Could pass a blocking queue to handle messages.
        my_tasks.add( new Worker( ) );
    }

    final List<Future<?>> fList = new ArrayList<Future<?>>();

    for ( final Worker task : my_tasks )
    {
        fList.add( RunnableTester_rev2.threadManager.submit( task ) );
    }

    RunnableTester_rev2.threadManager.shutdown();


    /**
     * Are we all done
     */
    boolean isDone = false;

    /**
     * Number of threads finished
     */
    int nThreadsFinished = 0;

    /**
     * Keep processing until all done
     */
    while ( !isDone )
    {
        /*
         * Are all threads completed?
         */
        isDone = true;
        int ii = 0;
        nThreadsFinished = 0;
        for ( final Future<?> k : fList )
        {
            if ( !isThreadDoneList.get( ii ) )
            {
                final boolean isThreadDone = k.isDone();
                if ( !isThreadDone )
                {
                    // Reduce printout by removing the following line
                    // System.out.printf( "%s is not done\n", mywks.get( ii ).getName() );
                    isDone = false;
                }
            }
            else
            {
                nThreadsFinished++;
            }
            ii++;
        }

        /*
        *  WOULD LIKE TO PROCESS Worker THREAD MESSAGES HERE
        */

    }
2

2 Answers

1
votes

That's really complicated. I would have done something like this.

// no need to use a field.
ExecutorService threadManager = Executors.newFixedThreadPool( nThreadsAllowed );
List<Future<Result>> futures = new ArrayList<Future<Result>>();
for ( int i = 0; i < nThreadsToRun; i++ )
    // Worker implements Callable<Result>
    futures.add(threadManager.submit(new Worker( )));
threadManager.shutdown();
threadManager.awaitTermination(1, TimeUnit.MINUTE);
for(Future<Result> future: futures) {
    Result result = future.get();
    // do something with the result
}
0
votes

I'd use a BlockingQueue. You could try something like this:

final BlockingQueue<Message> q = new LinkedBlockingQueue<Message>();
// start all the worker threads,
// making them report progress by adding Messages into the queue
for (...) {
  threadManager.submit(new Runnable() {
    // initialize the worker
    q.offer(new Message("10% completed"));
    // do half of the work
    q.offer(new Message("50% completed"));
    // do the rest of the work
    q.offer(new Message("100% completed"));
  });
}
// after starting all the workers, call a blocking retrieval method
// on the queue, waiting for the messages
while (!(everything is done)) {
  Message m = q.take();
  // process the message
}

The idea is that all the worker threads and the main thread share a common blocking queue. The worker threads are the producers, and the main thread is the consumer.

Here I use .offer(...), but you could use .add(...) and you'd get the same in this case. The difference would be that the former would return false if the queue is full, while the latter would throw an exception. It can't happen here, since I've instantiated an unbounded queue (you could obviously get an OutOfMemoryError). If you use .put(...), however, you'd have to deal with an unchecked InterruptedException (which is not useful in this case: it can't get thrown because the queue is never full -- it is unbounded --, so the method will never block).

Obviously, you can improve it by refactoring the calls to q.offer(...) into a reportStatus(...) method, or something like that.


Maybe you want something different than what I proposed above. Maybe you want the main thread to get asynchronous messages only when each worker thread finishes its work.

My solution is similar that the one proposed by Peter Lawrey, the difference being that, with this solution, the managing thread will get notified as soon as any of worker thread finishes (while with Peter's solution, the result from the worker threads will be retrieved in order -- so, if the first worker thread takes forever to run, even if all the other worker threads finish their jobs you won't know it). Depending on what you are doing, you could get increased performance.

Note that with this solution you cannot infer which thread finished its work (to do so, your Worker class would have to have a method like .getJobId() or something like that), while with Peter's solution you will know exactly which thread finished its work (because you are iterating in the futures list in the same order you instantiated and started the Workers).

If this is the case, you can use an ExecutorCompletionService. It encapsulates an ExecutorService and provides a .take() method (blocking retrieval) that will return Futures from the executor service right after they've completed:

ExecutorCompletionService<Worker> ecs = new ExecutorCompletionService(executorService);
// Worker has to be a callable
// Now you submit your tasks to the ecs:
for (...) {
  ecs.submit(new Worker(...));
}
// Now you block on .take():
while (!(everything is done)) {
  Future<Worker> f = ecs.take();
  // f.get(), for example, to check the results
}

The CompletionService guarantees that, once you take a Future, a call to its .get() method will complete immediately: either by returning the result, or by throwing an ExecutionException, encapsulating the exception thrown by the Callable.