1
votes

I have a system that reads names from a list, calls an external server for a true/false status check and actions those with a true status. the call to the external server take some time so running it all in one thread isn't very efficient.

I am currently trying to implement it as a producer/consumer system where many consumer threads read the names from a list, call the external server, put the valid names in a blocking queue and have a single consumer pick items from the queue and action them. sadly however the system will at times run to completion and will at other times hang indefinitely. Test code is as follows

public class SubscriberTest {
    static Queue<String> subscribed = new ConcurrentLinkedQueue<String>();
    static BlockingQueue<String> valid = new LinkedBlockingQueue<String>(100);
    Random rand = new Random();

    public SubscriberTest(int i) {
        for (int j = 0; j < i; j++) {
            subscribed.add("I love:" + j);
        }
    }

    public SubscriberTest(Queue<String> subs) {
        subscribed = subs;
    }

    public static void main(String[] args) {
        SubscriberTest fun = new SubscriberTest(10000);
        System.out.println(subscribed.size());
        ExecutorService producers = Executors.newCachedThreadPool();
        ExecutorService consumers = Executors.newSingleThreadExecutor();
        Consumer consumer = fun.new Consumer();
        Producer producer = fun.new Producer();
        while (!subscribed.isEmpty()) {
            producers.execute(producer);
            consumers.execute(consumer);
        }
        producers.shutdown();
        consumers.shutdown();
        System.out.println("finally");
    }

    // take names from subscribed and get status
    class Producer implements Runnable {
        public void run() {

            String x = subscribed.poll();
            System.out.println("Producer: " + x + " " + Thread.currentThread().getName());
            try {
                if (getStatus(x)) {
                    valid.put(x);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // this is a call to an external server
        private boolean getStatus(String x) {
            return rand.nextBoolean();
        }
    }

    // takes names from valid queue and save them
    class Consumer implements Runnable {

        public void run() {
            try {
                System.out.println("Consumer: " + valid.take() + " " + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }
}

Please show me where I go wrong.

2

2 Answers

4
votes
String x = subscribed.poll();

Will return null if nothing is available in the queue, which means that you will try to put a null onto the 'valid' queue, which will cause a null pointer exception, and that particular thread will exit. When this happens with all the threads in the pool, the application will hang.

2
votes

An ExecutorService is a pool of threads with a queue of tasks. Adding another queue just adds complexity and increases the chance you will do something incorrect. I suggest you just use the queue already there.

public class SubscriberTest {
    public static void main(String[] args) throws InterruptedException {
        final ExecutorService consumers = Executors.newSingleThreadExecutor();
        // middle producer
    final ExecutorService producers = Executors.newFixedThreadPool(
                                   Runtime.getRuntime().availableProcessors());
    // subscribed/original producer.
    for (int i = 0; i < 1000*1000; i++) {
            final String task = "I love:" + i;
            producers.execute(new MidProducer(task, consumers));
        }

        producers.shutdown();
        producers.awaitTermination(10, TimeUnit.SECONDS);
        consumers.shutdown();
        System.out.println("finally");
    }


    static class MidProducer implements Runnable {
        private final Random rand = new Random();
        private final String task;
        private final ExecutorService consumers;

        public MidProducer(String task, ExecutorService consumers) {
            this.task = task;
            this.consumers = consumers;
        }

        public void run() {
            System.out.println("Producer: " + task + " " + Thread.currentThread().getName());

            if (getStatus(task))
                consumers.execute(new Consumer(task));
        }

        private boolean getStatus(String x) {
            return rand.nextBoolean();
        }

    }

    static class Consumer implements Runnable {
        private final String task;

        private Consumer(String task) {
            this.task = task;
        }

        public void run() {
            System.out.println("Consumer: " + task + " " + Thread.currentThread().getName());
        }
    }
}

prints

Producer: I love: 1 pool-2-thread-2
Producer: I love: 3 pool-2-thread-4
Producer: I love: 2 pool-2-thread-3
Producer: I love: 5 pool-2-thread-2
Producer: I love: 7 pool-2-thread-2
Producer: I love: 4 pool-2-thread-5
Producer: I love: 6 pool-2-thread-6
Producer: I love: 8 pool-2-thread-7
Producer: I love: 10 pool-2-thread-2
Producer: I love: 9 pool-2-thread-5
Producer: I love: 11 pool-2-thread-8
Producer: I love: 12 pool-2-thread-9
Producer: I love: 14 pool-2-thread-10
Producer: I love: 13 pool-2-thread-2
Producer: I love: 16 pool-2-thread-10
Producer: I love: 15 pool-2-thread-11
Producer: I love: 17 pool-2-thread-12
Producer: I love: 20 pool-2-thread-14
Producer: I love: 19 pool-2-thread-10
Producer: I love: 18 pool-2-thread-13
Producer: I love: 0 pool-2-thread-1
Producer: I love: 22 pool-2-thread-12
Producer: I love: 21 pool-2-thread-15
Producer: I love: 25 pool-2-thread-3
Producer: I love: 27 pool-2-thread-12
Producer: I love: 26 pool-2-thread-10
Producer: I love: 24 pool-2-thread-15
Producer: I love: 28 pool-2-thread-1
Producer: I love: 23 pool-2-thread-16
Producer: I love: 31 pool-2-thread-11
Producer: I love: 30 pool-2-thread-16
Producer: I love: 32 pool-2-thread-1
Producer: I love: 36 pool-2-thread-3
Consumer: I love: 2 pool-1-thread-1

...

Consumer: I love: 9975 pool-1-thread-1
Consumer: I love: 9977 pool-1-thread-1
Consumer: I love: 9978 pool-1-thread-1
Consumer: I love: 9979 pool-1-thread-1
Consumer: I love: 9981 pool-1-thread-1
Producer: I love: 9996 pool-2-thread-16
Consumer: I love: 9984 pool-1-thread-1
Consumer: I love: 9985 pool-1-thread-1
Consumer: I love: 9990 pool-1-thread-1
Consumer: I love: 9992 pool-1-thread-1
Producer: I love: 9997 pool-2-thread-16
Consumer: I love: 9994 pool-1-thread-1
Consumer: I love: 9995 pool-1-thread-1
Consumer: I love: 9996 pool-1-thread-1
Producer: I love: 9998 pool-2-thread-16
Producer: I love: 9999 pool-2-thread-16
Consumer: I love: 9997 pool-1-thread-1
Consumer: I love: 9998 pool-1-thread-1
Consumer: I love: 9999 pool-1-thread-1
finally