2
votes

I'm testing bulk-loading with Titan-1.0.0

I set storage.batch-loading=true in my configuration file

I'm using TitanBlueprintsTransaction for graph loading performance in my java program and it is multi-threaded.

I'm getting ConcurrentModificationException exception during load process and my code is like this

In thread #1, search for vertex using titan composite index like this

Iterator<TitanVertex> it = tx.query().has("key", key).vertices().iterator();
TitanVertex vtx = it.next();

In thread #2, try to add edge at same vertex

tx.getVertex(v).addEdge(edgeLabel, target);

Maybe exception occurs when one thread is opening a iterator, and the other is making modification to same vertex. How can i resolve this exception?

2
What is return type of tx.query().has("key", key).vertices() and tx.getVertex(v) ?? - hagrawal
Iterable<TitanVertex> and TitanVertex - seungwon
Can you post more of the code, such as where the transactions are created and completed? How about a stack trace too? - Jason Plurad

2 Answers

0
votes

In previous versions, storage.batch-loading was for single threaded operation only. http://thinkaurelius.github.io/titan/wikidoc/0.4.4/Graph-Configuration.html Sounds like that may still be the case in Titan 1.0.

0
votes

I'm not familiar in titan, but if I facing with a multithreading issue, I try to analyse where the concurrent access ocurred, and what is the smallest area where i can synchronize the concurrent access, or what's better alternatives available (eg.: thread safe Collection alternatives, other way to access).

This code models this error:

public class ConcurrentMod
{

    public static <C extends Collection<String>> void fillCollectionTestData(C coll)
    {
        for(int i=0;i<10000;++i)
            coll.add(String.valueOf(i));
    }

    public static void main(String[] args)
    {
        final List<String> data = new ArrayList<>();
        fillCollectionTestData(data);

        new Thread()
        {
            public void run()
            {
                //heavy opertaion for an array
                for(int i=0;i<100;++i)
                    data.remove(i);
            }
        }.start();

        for(String s:data)
            s.length();
    }
}

Regular lists (ArrayList, LinkedList) can't handle concurrent access. But as I tested even Vector's iterator can't handle this case (WAT? Good to know). So if you can modify the implementing code there's no easy way to simply change Collection type to another concurrenty safe type.

Another idea is: if "tx.query().has("key", key).vertices()" return value based on Collection you can use it's toArray(new Type[0]) and iterate trough it's copy. In this example modify the iterating cycle in main to: for(String s:data.toArray(Mirror.emptyStringArray)) s.length();

Solvs the problem, BUT maybe still have concurrent issues, if the the Collection behind implementation doesn't manage concurrent access. I mean: An example: A type declares it's doesn't contains null value at all, BUT if a delete called in a thread and you call toArray(T) in another you maybe can facing an array with a null value! Because you see an incomplete operation's view. Deleting half completed but consistency not recovered because delete is not fully completed. If you can't sure that behind implementation handle this you need to use external synchronization:

    for(int i=0;i<100;++i)
        //other threads can access array between 2 heavy array shifting
        synchronized(data)
        {
            data.remove(i);
        }

    //and synchronize during array copy
    String[] tmp = null;
    synchronized(data)
    {
        tmp = data.toArray(Mirror.emptyStringArray);
    }

    for(String s:tmp)
        s.length();