8
votes

Assume there are 1 producer P and 2 consumers C1 and C2. And there are 2 queues Q1 and Q2, both with a specific capacity.

P will produce items and put it into Q1 and Q2 alternately. Item is produced for specific consumer and cannot be consumed by other consumers. How can I implement the following in Java: After I start the 3 threads, if Q1 is empty, thread C1 is blocked until it is notified when there is something in Q1. So is Q2. And P will be blocked when both Q1 and Q2 are full until it is notified when either Q1 or Q2 is not full.

I was thinking to use BlockingQueue, which will block a consumer when its queue is empty. But the problem is when either of queues is full, the producer will be blocked. Is there any data structure in Java we can use to solve this problem?

Update

I've got a solution myself, but I'm not sure whether it is efficient. We can still have 2 BlockingQueues. And when consumer take item from its queue, it uses BlockingQueue.take(), so it will be blocked when there is no item in the queue. When producer add item to either queue, it uses BlockingQueue.offer(). So that it will never be blocked by this operation and will get 'false' if the queue is full. In addition, we keep a AtomicInteger to indicate the number of queues which is not full. Every time when producer P want to put an item in a queue, if it get false return, we decrease the AtomicInteger by 1. When it reaches 0, the producer call AtomicInteger.wait(). Every time when a consumer takes an item from its queue, it also examines the AtomicInteger. When it is 0, the consumer increase it by 1 and call AtomicInteger.notify().

Please let me know whether this solution makes sense.

Thanks a lot!

3
P will be blocked when both Q1 and Q2 are full Blocked on what, Q1, Q2 or some other synchronizer?John Vint
You may not need two queues. Use a single BlockingQueue, call offer(E e, long timeout, TimeUnit unit) from the producer and call take from the consumers, and then you don't need to worry about uneven queue sizesZim-Zam O'Pootertoot
Hi Zim-Zam O'Pootertoot, thank you. I modified the question, "Item is produced for specific consumer and cannot be consumed by other consumers." So I don't know whether in this case we can use a single BlockingQueue. This is actually a real problem I met.user2440712
Hi John Vint, thank you. P is supposed to be blocked util either of the queue is not full. It doesn't matter what it is blocked on.user2440712
In that case it makes sense to use separate queues. Use offer(E e, long timeout, TimeUnit unit) from the producer and take from consumers so that the three threads will block until space / items are available. If you want to still be able to produce items for queue1 when queue2 is full or vice versa then you probably want two producers instead of one.Zim-Zam O'Pootertoot

3 Answers

1
votes

Have you considered a Striped Executor Service. This will allow you to solve your problem and put your consumers into a pool which would be much more efficient.

0
votes

No matter which data structure/messagig server you choose, you can go out of resources with any of these. There is always a limit on memory or disk space.

So actually it's not bad that the producer is stopped.

If your queues are filling up, you should try to restore the balance: You could add more consumers. You could improve the consumer's performance. If this is not possible, something should really throttle the producer. It's a way to avoid out of memory errors or no space left on device.

Finally it's the data center's duty to monitor the queues anyway. They should inform you, if the filling degree of your queues reaches a limit, for example > 80%.

Update

If the producer cannot send on all queues, because one of its queues is full, it is up to him to buffer, but buffering is something the queues should do.