1
votes

I want to implement a thread safe Map of Queues.

I intent to start with an empty Map. If the key does not exist, I want to create a new Map entry with a new Queue. If the key does exist, I want to add to the Queue. My proposed implementation is as follows:

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class StackOverFlowExample {

    private final Map<String, ConcurrentLinkedQueue<String>> map = new ConcurrentHashMap<>();

    public void addElementToQueue(String key, String value){
        if (map.containsKey(key)){
            map.get(key).add(value);
        }
        else{
            ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
            queue.add(value);
            map.put(key, queue);
        }           
    }    
}

My concern is that is when multiple threads attempt to add a new value to the Map, the first will put a new Map entry with a new Queue, the second will wait, and then put a new Queue for the key, rather than adding to the Queue. My concurrency / concurrency API knowledge is slim at best. Perhaps the concurrency is in-place to avoid this? Advice would be much appreciated.

3

3 Answers

4
votes

This pattern has probably been posted many times on SO (efficiently adding to a concurrent map):

Queue<String> q = map.get(key);
if(q == null) {
  q = new ConcurrentLinkedQueue<String>();
  Queue<String> curQ = map.putIfAbsent(key, q);
  if(curQ != null) {
    q = curQ;
  }
}
q.add(value);

Note that since Java 8, this can be replaced with computeIfAbsent().

1
votes

So your fear is that thread A and thread B will do the following:

thread A: lock ConcurrentHashMap Look for Queue "x" (not found) unlock ConcurrentHashMap create Queue "x" lock ConcurrentHashMap Insert Queue X unlock ConcurrentHashMap Thread B: Lock ConcurrentHashMap (while thread A is in 'create Queue X') look for queue X (not found) unlock ConcurrentHashMap (thread A then gets lock) create Queue "x" v2 lock ConcurrentHashMap Insert Queue X v2 (overwriting the old entry) unlock ConcurrentHashMap

That is in fact a real issue, but one that is easily resolved by making AddElementToQueue be a synchronized method. Then there can only be one thread inside AddElementToQueue at any given time, and thus the synchronization hole between the first 'unlock' and the second 'lock' is closed.

Thus

public synchronized void addElementToQueue(String key, String value){

should resolve your lost-queue problem.

0
votes

If Java 8 is an option :

public void addElementToQueue(String key, String value) {
    map.merge(key, new ConcurrentLinkedQueue<>(Arrays.asList(value)), (oldValue, coming) -> {
        oldValue.addAll(coming);
        return oldValue;
    });
}