I have a system that reads names from a list, calls an external server for a true/false status check and actions those with a true status. the call to the external server take some time so running it all in one thread isn't very efficient.
I am currently trying to implement it as a producer/consumer system where many consumer threads read the names from a list, call the external server, put the valid names in a blocking queue and have a single consumer pick items from the queue and action them. sadly however the system will at times run to completion and will at other times hang indefinitely. Test code is as follows
public class SubscriberTest {
static Queue<String> subscribed = new ConcurrentLinkedQueue<String>();
static BlockingQueue<String> valid = new LinkedBlockingQueue<String>(100);
Random rand = new Random();
public SubscriberTest(int i) {
for (int j = 0; j < i; j++) {
subscribed.add("I love:" + j);
}
}
public SubscriberTest(Queue<String> subs) {
subscribed = subs;
}
public static void main(String[] args) {
SubscriberTest fun = new SubscriberTest(10000);
System.out.println(subscribed.size());
ExecutorService producers = Executors.newCachedThreadPool();
ExecutorService consumers = Executors.newSingleThreadExecutor();
Consumer consumer = fun.new Consumer();
Producer producer = fun.new Producer();
while (!subscribed.isEmpty()) {
producers.execute(producer);
consumers.execute(consumer);
}
producers.shutdown();
consumers.shutdown();
System.out.println("finally");
}
// take names from subscribed and get status
class Producer implements Runnable {
public void run() {
String x = subscribed.poll();
System.out.println("Producer: " + x + " " + Thread.currentThread().getName());
try {
if (getStatus(x)) {
valid.put(x);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// this is a call to an external server
private boolean getStatus(String x) {
return rand.nextBoolean();
}
}
// takes names from valid queue and save them
class Consumer implements Runnable {
public void run() {
try {
System.out.println("Consumer: " + valid.take() + " " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Please show me where I go wrong.