0
votes

I was reading a little on producer consumer implementation in Java. Most of the online code exampled showed the solution with the following basic object:

A blocking queue to use as the data pipe

n number of Producer which pushed messaged into the Queue

n number of consumer which are polling for data from the queue

Normally the consumers are submitted to a thread pool which completes the implementation.

Another option I investigated, that had little to none representation online is simply submitting the task as a runnable object to a FixedThreadPool.
In this implementation, the business logic for handling a message is implemented in a run() method of a Runnable object. When we have a new message we would like to process we can simply submit a new task of that type to a FixedThreadPool, and... that's it. The number of running threads is managed by the FixedThreadPool implementation along with the logic of polling for messages, all that is left for us to implement is the buisness logic for our use case.
Can anyone explain why is this solution overlooked?
Is there any specific reason we need to use a blocking queue and polling when the java language already implemented that for us?

public class ProducerConsumerExample{

    private ExecutorService pool;

    public ProducerConsumerExample(int numberOfThreads){
        this.pool = Executors.newFixedThreadPool(numberOfThreads);
    }

    public void submit(MessageObject msg){
        pool.submit(new MessagePrinter(msg));
    }

}

public class MessagePrinter implements  Runnable{
    private MessageObject msg;

    public MessagePrinter(MessageObject msg){
        this.msg = msg;
    }

    @Override
    public void run() {
        //only need to implement logic that is releavent for our use case
        System.out.println("Message recieved " + msg.toString());
    }
}

public static void main(String[] args){
    ProducerConsumerExample ex = new ProducerConsumerExample(5);
    for(int i=0;i<WHATEVER;i++){
        ex.submit(new MessageObject());
    }
}
2
Re, "...why is this solution overlooked?" What makes you think that it has been overlooked? Your question says that you were reading about the producer/consumer pattern. Giving tasks to a thread pool is a different pattern. It is possible that there are many problems that could be solved either way, but if you go looking specifically for information about producer/consumer, then you should not be surprised if you don't find much information about how to make the most effective use of thread pools.Solomon Slow
P.S.; Using a thread pool to start long-running "consumer" threads is somewhat a case of fitting a square peg into a round hole. What you're really doing in that case is using the thread pool as a substitute for a thread factory. The Java standard library defines a ThreadFactory interface, and ways to get a system-default ThreadFactory. Check it out!Solomon Slow

2 Answers

0
votes

nothing wrong with your approach, except that it is not obvious what queue is used in the pool created by Executors.newFixedThreadPool(). Are you sure you cannot get RejectedExecutionException if the number of task become considerable?

So better use new explicit creation of the task queue, with predefined size. Look at the source code of the Executors.newFixedThreadPool(int).

0
votes

Actually, it is preferred to use thread pools in producer-consumer applications. From Java Concurrency in Practice:

Executor is based on the producer-consumer pattern, where activities that submit tasks are the producers (producing units of work to be done) and the threads that execute tasks are the consumers (consuming those units of work). Using an Executor is usually the easiest path to implementing a producer-consumer design in your application.

Things should be considered when choosing an implementation:

  • Coupling: How much can a producer and consumer know about each other and about the channel connects them together?
  • Underlying Queue Implementation: What type of Queue would be perfect for the job?
  • Life Cycle: What would happen if the ThreadPool was corrupted or shutdown?
  • Thread Management and Starvation: How much are you involved in managing your threads? Long-lived vs short-lived objects?