3
votes

I have a Producer/Consumer scenario, where I wan't one Producer to deliver products and multiple Consumers to consume these products. However, the common scenario is that a delivered product is consumed by only one Consumer and the other Consumers never see this specific product. What I wan't to accomplish is that a product is consumed once by each Consumer without any kind of blocking.

My first idea was to use multiple BlockingQueues, one for each Consumer, and make the Producer put each product sequentially in all available BlockingQueues. However, if one of the queues is blocking (e.g., the queue is full), the producer can't go on. On the other hand, spawning a new Thread for each product to put the product in the queue seems to be not a good solution, since if the queue is full, more and more threads will be created.

The problem is, that in my scenario one Consumer could need much more time to process a product than another, depending on the input. It would also be hard to adjust the capacity of the BlockingQueues, since the processing time for a consumer is not really predictable.

Can one think of a more elegant solution? Or is it possible to increase/decrease the capacity of the BlockingQueues dynamically to balance the filling level?

2
Ideally, you shouldn't be processing data while taking data from the BlockingQueue, so the problem in your third paragraph shouldn't even be a problem. To be honest, I'm not sure how a single BlockingQueue wouldn't suffice... And there are BlockingQueues that effectively have unlimited capacity, such as LinkedBlockingQueueawksp
What is your scope? Single instance doing this process in-memory, Big Data (terabytes/petabytes of messages, tens to hundreds of nodes) or single-threaded-one-of-each-in-reality?Esko
@Esko I want to read commands from a file/database, and process them, while the same command is handled differently for different protocols (working on graphs). It runs locally on one machine. The commands are simple, however, I need to process up to many millions of them (probably a file size of a few GB).S1lentSt0rm
@user3580294 Sure, I'm not processing data while taking it out of the queue, but with only one queue I would need to wait until the slowest worker returns and then start over with the next element.S1lentSt0rm
I'm afraid I still don't quite understand, but seeing as you already found a solution I guess that's itawksp

2 Answers

1
votes

It looks like you need JMS in publish/subscribe style, you can try ActiveMQ http://activemq.apache.org/download.html, it has many options you will certainly find one which suits your needs best

1
votes

Take a look at Guava's EventBus, a part of Guava's library that deals with in-process PubSub-style communication. It might suit your needs and save you from re-inventing some wheel.

If you are looking for interprocess communication, consider a message queue just like Evgeniy proposed.