1
votes

I have a rather standard producer and consumer threads:

  • producer reads and decodes bytes from a file to a blocking queue.
  • consumer is polling items from the queue

Happenes that decoding process is a bottle neck and would probably benefit from having more CPU. It is 70% of producer time. Would I gain any noticable performance if I introduce "decoder" thread?

  • producer reads bytes from a file to a blocking "object" queue
  • decoder decodes byte objects to items
  • consumer is polling "decoded" items

I need to use one queue due to memory foot print - can't afford having two queues (bytes/items), so I guess object "casting" overhead will be present?

Any idea on how to implement this 3 threaded solution?

Thank you!

2
I dont think having an additional thread will solve your problem as it will also be competing for same resource (CPU). Only case it can benefit you if the time taken to read the byte from file(IO operation) is more or equal to decoding thread.abhinav
Can you please explain why you definitely don't want two queues? I would have suggested to have 2 queues, raw and decoded, with multiple decoder threads taking from the raw queue and placing on the decoded queue.vikingsteve
What kind of decoding do you do ? There are a lot of lightning fast deserialization framework (like Avro) which will most likely reduce your problem to none or so. Btw, it's hard to believe that decoding is longer than reading the file itself, are you sure it's not the file reading that is blocking ?C4stor
I'm not sure I understand the memory footprint argument - if you take something off the raw queue and put it on the decoded queue you're not using any more memory (modulo the difference in size between raw and decoded).selig
I am decoding bitmaps, therefore memory footprint is very important. Image decoding is more timely than reading. I must be reusing objects as much as possible to avoid GC and keeping memory low to acceptable. I was thinking something along the lines: reader => hands out bytes to available decoder thread (from the thread pool) and decoder adds objec to the queue. I also need to assure queue order.hpet

2 Answers

0
votes

You should tune the thread pools for the producer and consumer - for example if the consumer is way too fast vis-a-vis the producer it's thread pool could be allocated fewer threads than producer's thread pool. This should then lead to a significant increase in throughput. The ratio of producer to consumer threads should be tuned (example 3:1).

On similar lines, you could have three thread pools in which Producer (Reader) and Consumer have lesser threads, while the decoder (transformer) thread pool has a higher number of threads. I am not sure if you need code examples in which case you should share what you currently have. I would start with thread pools of size 1 for Producer and Consumer and size 5 for transformer (decoder) and then measure what the bottleneck is (and if then the throughput meets your expectations)

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ProducerDecoderConsumer {
    /**
     * @param args
     */
    public static void main(String[] args) {
        BlockingQueue<Integer> inputQueue = new PriorityBlockingQueue<Integer>();
        BlockingQueue<String> outputQueue = new PriorityBlockingQueue<String>();
        ExecutorService reader = Executors.newSingleThreadExecutor();
        reader.submit(new Producer(inputQueue));
        ExecutorService decoder = Executors.newFixedThreadPool(5);
        decoder.submit(new Transformer(inputQueue, outputQueue));
        ExecutorService writer = Executors.newSingleThreadExecutor();
        writer.submit(new Consumer(outputQueue));

    }

    private static class Producer implements Callable<Void> {
        final BlockingQueue<Integer> queue;

        public Producer(final BlockingQueue<Integer> pQueue) {
            queue = pQueue;
        }

        @Override
        public Void call() throws Exception {
            try {
                Random random = new Random();
                while (true) {
                    queue.put(random.nextInt());
                }
            } catch (Exception e) {

            }
            return null;
        }
    }

    private static class Transformer implements Callable<Void> {
        final BlockingQueue<Integer> inputQueue;

        final BlockingQueue<String> outputQueue;

        public Transformer(final BlockingQueue<Integer> pInputQueue, final BlockingQueue<String> pOutputQueue) {
            inputQueue = pInputQueue;
            outputQueue = pOutputQueue;
        }

        @Override
        public Void call() throws Exception {
            try {
                while (true) {
                    Integer input = inputQueue.take();
                    String output = String.valueOf(input); // decode input to output
                    outputQueue.put(output); // output
                }
            } catch (Exception e) {

            }
            return null;
        }
    }

    private static class Consumer implements Callable<Void> {
        final BlockingQueue<String> queue;

        public Consumer(final BlockingQueue<String> pQueue) {
            queue = pQueue;
        }

        @Override
        public Void call() throws Exception {
            try {
                while (true) {
                    System.out.println(queue.take());
                }
            } catch (Exception e) {

            }
            return null;
        }
    }
}

I have added some code to illustrate the idea - I am using two blocking queue unlike the single queue mentioned in your question as I don't think there would be an overhead of just having the extra queue - I would suggest the use of a profiler to demonstrate such a thing. However, I hope you find it useful and can retrofit it to the single queue model should you really feel the need.

0
votes

2 queues, one to hold undecoded objects from which multiple consumers decode.

the multiple consumers would decode and write decoded objects to the second queue, from which ultimate consumers would consume.

make sure to avoid deadlock (use notifyAll() not notify() unless you really know what you are doing)