4
votes

I am looking for an existing implementation for a concurrent queue that can be used to implement a typical producer-consumer pattern with the following caveat. Each producer is short-lived but before exiting has to block until all the messages it put on the queue have been processed by the consumers. Consumer and the queue are long-lived.

Some ideas I've had were around either using a single partitioned queue where each partition is assigned to a producer or have a dedicated queue for each producer and build a sort of composite queue on top of producer queues which will be used by the consumers.

This seems like this would be a common pattern for http servers which have numerous http threads which would act like producers to the queue and a fixed number of worker threads which would act like consumers of the queue. The http threat will then block until the work it enqueue is completely processed by the consumers before returning http response to the http client.

1

1 Answers

2
votes

Each producer is short-lived but before exiting has to block until all the messages it put on the queue have been processed by the consumers. Consumer and the queue are long-lived.

Stream.parallelStream() does this. It is a common pattern which can be solved a few ways e.g. ExecutorService.submit(task) + Future.get().

build a sort of composite queue on top of producer queues

I would try to make it as simple as possible.

What works best depends on the workload of your tasks. e.g. is it CPU or IO bound.

pull 1000 messages off the queue, process messages, notify producers of the messages processed successfully;

List<Message> toProcess = someSourceOfMessages(1000);
toProcess.parallelStream().forEach(m -> process(m));
// do something after all tasks are complete.