1
votes

I am trying to implement a simple producer/consumer system in Java 11. Basically, I take two threads for each, plus a global queue, simply as follows:

  • A global priority queue.
  • The first thread, producer, runs a HTTP server, listens to incoming http messages, and upon receiving a message, pushes it as a job to the queue (queue.size increments)
  • The second thread, the consumer, continously peeks the queue. If there is a job (job ! = null), submits a HTTP request somewhere and upon successful receipt, polls it from the queue (queue.size() decrements).

The skeleton is as below:

Main Class:

public class Manager
{
    private Consumer consumer;
    private Producer producer;
    Queue queue;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

Producer class:

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
            //HTTP server starts, listens, and adds to the queue upon receiving a Job
            server.start();
            Manager.queue.add(new Job());
    }
}

Consumer class:

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
    // Thread.sleep(1);

        while(true)
        {
            //get an object off the queue
            Job job= Manager.queue.peek();
            //do some stuff with the object
        }
    }
}

Producer and queue works- all good. But the problem is with the Consumer. The Consumer code above (with while(true) loop) doesn't peek the item. But when I add a Thread.sleep(x) before while(true) loop, even if x=1 ms, it works, and grabs the item successfully.

What is the problem? Theoretically, while(true) loop shouldn't be a problem! Why it can not see and peek the item?!

2
What are you initializing queue with?Jacob G.
Queue is empty initially. Just with a size of 2000.Tina J
Unfortunately that does not help me. What class are you using to initialize queue? Can you post the full code?Jacob G.
static PriorityQueue<Job> jobSubmissionQueueTina J
PriorityQueue is not thread-safe. You can use PriorityBlockingQueue in place of it, which is thread-safe.Jacob G.

2 Answers

1
votes

PriorityQueue is not thread-safe whereas PriorityBlockingQueue is. As long as you aren't using any methods defined in the BlockingQueue interface, these two implementations are interchangeable. Simply changing PriorityQueue to PriorityBlockingQueue should fix your problem.

1
votes

The cause of the problem: non-synchronized reading and writing from and to a queue.

What happens here is that both threads, running on different CPU-cores work with their own copy of the queue, thus the producer might be adding stuff and these changes probably even get propagated into RAM, but the consumer never checks for anything in RAM, since it has it's own cached copy of that queue, witch stays empty.

The Thread.sleep() thing works, because when waking up, the thread has to get all ist stuff from RAM, where it probably changed.

The correct way of doing it, is only accessing the Queue, when synchronized on it as follows:

In producer:

synchronized(Manager.queue) {
     Manager.queue.add(new Job());
}

and in Consumer:

boolean continue = true;
while (continue) {
    synchronized(Manager.queue) {
        Job job=Manager.queue.pop();
    }
}

And as a final touch: the while (true)-thing is incredibly inefficient, you could do something using Object.wait() and Object.notify()

In producer:

synchronized(Manager.queue) {
     Manager.queue.add(new Job());
     Manager.queue.notify();
}

and in Consumer:

boolean continue = true;
while (continue) {
    synchronized(Manager.queue) {
        while (Manager.queue.peek() == null) {
            Manager.queue.wait();
        }
        Job job=Manager.queue.pop();
    }
}