Usually SO existent topics help me to get over a problem, but now I found myself stuck.
I want to implement a Prod/Cons using concurrency in Java. Without using existing APIs because is for learning purposes.
My Producers are blocking the Consumers to consume the messages from the queue (Holder) but I want Producer and Consumers to use the queue simultaneous.
You can run my sample and you will see that, while the Producer is adding, the Consumer waits for the lock. But I want the consumer to do his job right after a message is added, not when the producer tells him.
I'm surprised that all those examples I found searching the P/C pattern works as mine (producer blocks the consumer, which doesn't make sense to me)
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
class Holder<T> {
private int capacity;
private Queue<T> items = new LinkedList<T>();
public Holder(int capacity) {
this.capacity = capacity;
}
public synchronized void addItem(T item) throws InterruptedException {
Thread.sleep(new Random().nextInt(2000));
while (isFull()) {
System.out.println("Holder FULL. adding operation is waiting... [" + item + "]");
this.wait();
}
System.out.println(items.size() + " -- holder +++ added " + item);
items.add(item);
this.notifyAll();
}
public T getItem() throws InterruptedException {
synchronized (this) {
while (isEmpty()) {
System.out.println("Holder EMPTY. getting operation is waiting...");
this.wait();
}
T next = items.poll();
System.out.println(items.size() + " -- holder --- removed " + next + " - remaining: " + items.size());
this.notifyAll();
return next;
}
}
private synchronized boolean isEmpty() {
return items.isEmpty();
}
private synchronized boolean isFull() {
return items.size() >= capacity;
}
}
class Producer implements Runnable {
public static final int GENERATED_ITEMS_COUNT = 10;
private int id;
private Holder<String> holder;
public Producer(int id, Holder<String> holder) {
this.id = id;
this.holder = holder;
}
@Override
public void run() {
try {
for (int i = 0; i < GENERATED_ITEMS_COUNT; i++) {
String produced = "Message " + i + " from [P" + id + "] " + System.nanoTime();
holder.addItem(produced);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private Holder<String> holder;
public Consumer(Holder<String> hodler) {
this.holder = hodler;
}
@Override
public void run() {
while (true) {
try {
String consumed = holder.getItem();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ConsumerProducerApp {
public static void main(String[] args) throws InterruptedException {
Holder<String> coada = new Holder<String>(10);
Thread consumer = new Thread(new Consumer(coada));
consumer.start();
Executor executor = Executors.newCachedThreadPool();
for (int i = 1; i <= 9; i++) {
executor.execute(new Producer(i, coada));
}
}
}
EDIT: So presuming we exclude the Thread.sleep from this equation. What if I have 100000 Producers, and they each produce messages. Are not they blocking my Consumer ? because of that common lock on the Holder. Isn't any way, maybe another pattern that let my Consumer do his job individually ? From what I understand until now, my implementation is correct and I may try to achieve the impossible ?