3
votes

I need the producer produce data one by one, and the consumer will consume the data by batch(will block unless the queue has enough elements). But java BlockingQueue seems only support the Consumer to consume the data one by one.

Kafka seems to be a solution, is there any other simpler solution to the problem ?

2
Is there a single consumer or multiple consumers? And if multiple consumers, does the order in the queue matter?Roger Gustavsson
@RogerGustavsson single consumer&single producer.ZhaoGang
@JoopEggen If there's only one item in the queue that will still return immediatly. Not when the queue size has reached 2. The second parameter is the maximum number of elements, but less may be transfered.Roger Gustavsson

2 Answers

0
votes

Let a consumer consume from the queue one-by-one and putting the items on its own list, doing nothing else. When the list has reached a certain size, start processing the list.

0
votes

If you need a pure java solution, I recently developed this utility that batch BlockingQueue elements using a flushing timeout if queue elements doesn't reach the batch size. It also supports fanOut pattern using multiple instances to elaborate the same set of data (using more cpu-core).

// Instantiate the registry
FQueueRegistry registry = new FQueueRegistry();

// Build FQueue consumer
registry.buildFQueue(String.class)
                .batch()
                .withChunkSize(5)
                .withFlushTimeout(1)
                .withFlushTimeUnit(TimeUnit.SECONDS)
                .done()
                .consume(() -> (broadcaster, elms) -> System.out.println("elms batched are: "+elms.size()));

// Push data into queue
for(int i = 0; i < 10; i++){
        registry.sendBroadcast("Sample"+i);
}

More info here!

https://github.com/fulmicotone/io.fulmicotone.fqueue