0
votes

I am trying to understand if the below is thread safe, it was written by another developer whose code I have inherited and is no longer with us.

I have a BaseProvider class that is actually a message cache, represented by a LinkedBlockingQueue. This class stores incoming messages in the queue.

I have a set of worker threads that read of this queue. As such the LinkedBlockingQueue is thread safe.

Questions 1. When the worker thread calls provider.getNextQueuedItem(), the provider goes through item by item and adds it to a list and returns the list of messages. While it is doing this, what happens if there is a message added to the provider class by calling addToQueue? Does the takeLock internal to the LinkedBlockingQueue prevent from adding a new message to the queue until all messages are taken off the queue?

  1. As you would notice, each worker thread has access to all the providers, so while one worker thread is going through all the providers and calls getNextQueuedItem() , what happens when another worker thread also calls through all the providers and calls getNextQueuedItem()? Would both the worker threads be stepping over each other?

    public abstract class BaseProvider implements IProvider {

     private LinkedBlockingQueue<CoreMessage> internalQueue = new LinkedBlockingQueue<CoreMessage>();
    
    @Override
    public synchronized List<CoreMessage> getNextQueuedItem() {
        List<CoreMessage> arrMessages = new ArrayList<CoreMessage>();                    
        if (internalQueue.size() > 0) {
            Logger.debug("Queue has entries");    
            CoreMessage msg = null;
            try {
                msg = internalQueue.take();
            } catch (InterruptedException e) {
                Logger.warn("Interruption");
                e.printStackTrace();
            }
            if (msg != null) {
                arrMessages.add(msg);
            }
        }
        return arrMessages;
    }
    
    protected synchronized void addToQueue(CoreMessage message) {
        try {
            internalQueue.put(message);
        } catch (InterruptedException e) {
            Logger.error("Exception adding message to queue " + message);
        }
    }
    

    }

// There are a set of worker threads that read through these queues

public class Worker implements Runnable 
 @Override
 public void run() {
    Logger.info("Worker - Running Thread : " + Thread.currentThread().getName());

    while (!stopRequested) {
        boolean processedMessage = false;
        for (IProvider provider : providers) {
            List<CoreMessage> messages = provider.getNextQueuedItem();
            if (messages == null || messages.size() != 0) {
                processedMessage = true;
                for (CoreMessage message : messages) {
                    final Message msg = createEndurMessage(provider, message);
                    processMessage(msg);
                    message.commit();
                }
            }
        }
        if (!(processedMessage || stopRequested)) {
            // this is to stop the thread from spinning when there are no messages
            try {
                Thread.sleep(WAIT_INTERVAL);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

}

1
Out of curiosity, why does getNextQueuedItem() return a list of one message? Why does it not simply return the message? - Solomon Slow
Re, if (messages == null || messages.size() != 0) ... Not clear what the author was attempting to say here, but it doesn't matter because messages can never be null: The getNextQueueItem() method always returns a List, which may contain either zero or one items. Also, there's probably no reason to explicitly test the size of the messages list either because the for (...) loop is going to test it anyway. - Solomon Slow
Re, "When the worker thread calls provider.getNextQueuedItem(), the provider goes through item by item..." That does not appear to be true. The implementation of getNextQueuedItem() in your example does nothing but take one item from internalQueue, and then it returns that single item wrapped up in a new ArrayList. - Solomon Slow

1 Answers

0
votes

what happens if there is a message added to the provider class by calling addToQueue?

getNextQueuedItem() and addToQueue(...) are both synchronized methods. If those are the only two methods that access the private ... internalQueue, then there is no way in which multiple threads could ever access internalQueue at the same time.

while one worker thread is going through all the providers and calls getNextQueuedItem() , what happens when another worker thread also calls through all the providers and calls getNextQueuedItem()?

Are you asking about multiple workers accessing the same provider? That can't happen because getNextQueuedItem() is a synchronized method.

-- OR --

Are you asking about different workers accessing different providers? That should not matter (at least, not as far as the BaseProvider class is concerned) because there does not appear to be any way in which the different objects could be connected with each other.