You should tune the thread pools for the producer and consumer - for example if the consumer is way too fast vis-a-vis the producer it's thread pool could be allocated fewer threads than producer's thread pool. This should then lead to a significant increase in throughput. The ratio of producer to consumer threads should be tuned (example 3:1).
On similar lines, you could have three thread pools in which Producer (Reader) and Consumer have lesser threads, while the decoder (transformer) thread pool has a higher number of threads. I am not sure if you need code examples in which case you should share what you currently have. I would start with thread pools of size 1 for Producer and Consumer and size 5 for transformer (decoder) and then measure what the bottleneck is (and if then the throughput meets your expectations)
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ProducerDecoderConsumer {
/**
* @param args
*/
public static void main(String[] args) {
BlockingQueue<Integer> inputQueue = new PriorityBlockingQueue<Integer>();
BlockingQueue<String> outputQueue = new PriorityBlockingQueue<String>();
ExecutorService reader = Executors.newSingleThreadExecutor();
reader.submit(new Producer(inputQueue));
ExecutorService decoder = Executors.newFixedThreadPool(5);
decoder.submit(new Transformer(inputQueue, outputQueue));
ExecutorService writer = Executors.newSingleThreadExecutor();
writer.submit(new Consumer(outputQueue));
}
private static class Producer implements Callable<Void> {
final BlockingQueue<Integer> queue;
public Producer(final BlockingQueue<Integer> pQueue) {
queue = pQueue;
}
@Override
public Void call() throws Exception {
try {
Random random = new Random();
while (true) {
queue.put(random.nextInt());
}
} catch (Exception e) {
}
return null;
}
}
private static class Transformer implements Callable<Void> {
final BlockingQueue<Integer> inputQueue;
final BlockingQueue<String> outputQueue;
public Transformer(final BlockingQueue<Integer> pInputQueue, final BlockingQueue<String> pOutputQueue) {
inputQueue = pInputQueue;
outputQueue = pOutputQueue;
}
@Override
public Void call() throws Exception {
try {
while (true) {
Integer input = inputQueue.take();
String output = String.valueOf(input); // decode input to output
outputQueue.put(output); // output
}
} catch (Exception e) {
}
return null;
}
}
private static class Consumer implements Callable<Void> {
final BlockingQueue<String> queue;
public Consumer(final BlockingQueue<String> pQueue) {
queue = pQueue;
}
@Override
public Void call() throws Exception {
try {
while (true) {
System.out.println(queue.take());
}
} catch (Exception e) {
}
return null;
}
}
}
I have added some code to illustrate the idea - I am using two blocking queue unlike the single queue mentioned in your question as I don't think there would be an overhead of just having the extra queue - I would suggest the use of a profiler to demonstrate such a thing. However, I hope you find it useful and can retrofit it to the single queue model should you really feel the need.