0
votes

Recently I came across this question : There are say 3 consumer threads and need to implement a lock-free queue(can't use synchronization) so that no consuming thread is blocked. Assume that queue already contains the data.

I thought about it for a while and came across Atomic operations which if used carefully can help. My implementation is as shown below. As data is already there in queue I have not implemented the enqueue method and populating the array inside the constructor.

public class SPMCQueue {

    private AtomicInteger index = new AtomicInteger(0);

    public int[] arr;

    public SPMCQueue(int size) {
        arr = IntStream.range(0, size).toArray();
    }

    public Integer dequeue() {
        Integer ret = null;
        int x = index.getAndIncrement();
        if (x < arr.length) {
            ret = arr[x];
            System.out.println(arr[x] + " by " + Thread.currentThread().getName());
        }
        else {
            throw new RuntimeException("Queue is empty");
        }
        return ret;
    }
}

class QueueTest {
    public static void main(String[] args) {

        SPMCQueueq = new SPMCQueue(40);

        Runnable t1 = () -> {
            try {
            while (true) {
                q.dequeue();
            }
            }catch(Exception e) {

            }
        };

        Runnable t2 = () -> { 
            try {
            while(true) { q.dequeue(); }
            }catch(Exception e) {

            }
        };

        Runnable r3 = () -> { 

            try {
                while(true) { q.dequeue(); }
            } catch (Exception e) {
                // TODO Auto-generated catch block
                //e.printStackTrace();
            }

        };

        Thread thread1 = new Thread(t1);
        Thread thread2 = new Thread(t2);
        Thread thread3 = new Thread(r3);

        thread1.start();
        thread2.start();
        thread3.start();

    }
}

I have executed the above program and the result shows that the all 3 consumers are consuming the data albeit out of order and some threads are consuming more data than the other threads but I don't see any of the data appearing multiple times in the o/p.

I have the following questions:

  1. Is there any issue in the above implementation?

  2. What are the other ways to implement lock-free consumer queue?

1
"What are the other ways to implement lock-free consumer queue?" See java.util.concurrent.ConcurrentLinkedQueueAndreas
"no consuming thread is blocked" - in case the queue become empty, what do you expect the consumer thread to do?Alexei Kaigorodov
@AlexeiKaigorodov assumption is queue has infinite data. Also, the problem is more about how to consume it in lock-free way without using synchronization.Yug Singh
@YugSingh I mean, when producer is slow and consumers are fast, the queue becomes empty. What do you expect consumer should do in this case, if not to block? Sleep and poll? Or waste CPU cycles until next item is produced?Alexei Kaigorodov

1 Answers

0
votes

I want to give my reply in tandem with answer: https://stackoverflow.com/a/21101904/7340499 since it is a similar question to yours.

So, your questions:

but I don't see any of the data appearing multiple times in the o/p.

This is expected. Because, getAndIncrement() is atomic, and anytime that function is accessed, you will get a different x value, hence a different output. However, due to combining "getAndIncrement()" and "System.out.print()" functions in a single non-atomic dequeue operation, your output may sometimes be out of order, e.g. you get x=1 on one thread, and another thread interrupts, gets x=2 and prints it, and then your initial thread finalizes printing. I believe this also points out the issues of your implementation, as asked in (1). Is your application okay with queue being processed out of order?

What are the other ways to implement lock-free consumer queue?

Well, atomic operations are one way, as you have noticed. But in essence, they are very much like locks, anyway. They just operate on a smaller scale (there are some implementation differences, though). So it is hard to conceptualize them as different methods, at least for myself. Other than that, I believe there are some nice resources over here: Lock Free Queue -- Single Producer, Multiple Consumers