1
votes

I have primitive messaging system inside application. Message can be submitted by the producer from one thread and processed by the consumer in another thread - there'are only two threads by the design: one thread for consumer and another for producer, and it's not possible to change this logic.

I'm using ConcurrentLinkedQueue<> implementation to work with messages:

// producer's code (adds the request)
this.queue.add(req);

// consumer's code (busy loop with request polling)
while (true) {
  Request req = this.queue.poll();
  if (req == null) {
    continue;
  }
  if (req.last()) {
    // last request submitted by consumer
    return;
  }
  // function to process the request
  this.process(req);
}

Processing logic is very fast, consumer may receive about X_000_000 requests per second.

But I've discovered using profiler that queue.poll() sometimes is very slow (it seems when queue is receiving a lot of new items from producer) - it's about 10x times slower when receiving a lot of new messages comparing to already filled up queue without adding new items from another thread.

Is it possible to optimize it? What is the best Queue<> implementation for this particular case (one thread for poll() and one thread for add())? Maybe it would be easier to implement some simple queue by-self?

1
Does this answer your question? LinkedBlockingQueue vs ConcurrentLinkedQueueAmongalen
@Amongalen thanks for the link, but I don't think so - LinkedBlockingQueue is worse for performance in my case: I tried to use it, but it was more than 2x times slower than non-blocking ConcurrentQueue. So both queue implementations are not suitable for my application.Kirill
Have you tried SynchronousQueue? This class is used when one thread wants to hand off data to another thread.Eric
@Eric And how will the producer be able to add multiple elements to the queue to store them while the comsumer is busy working on one?akuzminykh
For what it's worth I've successfully used a circular buffer for this very scenario in the past with relatively little effort. If I recall correctly, all that was needed was a (wrapping) write index, a (wrapping) read index, and a monitor variable for the writer to kick-start the waiting reader. The reader never updates the write index, the writer never updates the read index. -- As I recall, I was using a pre-allocated, fixed-length array. A dynamically resizing queue would likely have been more complicated to do.500 - Internal Server Error

1 Answers

3
votes

The consumer is slower while the producer is producing because each time it reads, it experiences a cache miss, since a new element will always be present. If all elements are already present, they can be fetched together, which improves throughput.

When busy-waiting consider using Thread.onSpinWait(): while it adds latency, it also enables certain performance optimizations.

// consumer's code (busy loop with request polling)
while (true) {
  Request req = this.queue.poll();
  if (req == null) {
    Thread.onSpinWait();
    continue;
  }
  if (req.last()) {
    // last request submitted by consumer
    return;
  }
  // function to process the request
  this.process(req);
}

The JDK does not have queues optimized for SPSC (Single-Producer Single-Consumer) scenarios. There are libraries for that. You can use Agrona or JCTools. Implementing these is not easy.

// Agrona
Queue<Request> queue = new OneToOneConcurrentArrayQueue<>(2048);
// JCTools
Queue<Request> queue = new SpscArrayQueue<>(2048);