I came across this Gist http://www.neo4j.org/graphgist?8012859 the other day and thought it would be handy for generating orderIds across a HA cluster. I have created a simple implementation and my unit tests are failing because the count field is not providing unique values in a thread safe manner.
There is a comment by Andres Taylor in the 2.0 version of the documentation which reads
We will be working more on isolation in 2.0. This problem should be fixed before the final release.
I am using Neo4J embedded 2.1.3, Neo HA and Spring Data Neo4J 3.1.4 and whilst I cannot find anything explicit in the release notes the Provisional Feature
warning has been removed.
I believe that the CREATE part of the MATCH query is happening in isolation but I believe that the ON MATCH part is operating on stale data, acquiring a write lock to the single unique node but not re-reading data after waiting for the acquisition. Looking at the transaction documentation it looks like the default isolation is READ COMMITTED, because of the nature of the my Merge query (combined Read/Write semantics - nid.count = nid.count + 1
) should I be manually acquiring an exclusive lock before performing the operation?
Code snippets:
The Node:
@NodeEntity
public class UniqueId {
@GraphId
private Long id;
@Indexed(indexType=IndexType.LABEL, unique=true)
private String type;
private long count;
//getters/setters/stuff
}
Repository:
public interface UniqueIdRepository extends GraphRepository<UniqueId> {
@Query(value = "MERGE (nid:UniqueId{type:{0}}) " +
"ON CREATE SET nid:_UniqueId, nid.count = 1 " +
"ON MATCH SET nid.count = nid.count + 1 " +
"RETURN nid.count")
public long generateUid(String type);
}
Service Method:
@Override
@Transactional
public Long generateOrderId() {
return idRepository.generateUid("Order");
}
Simple Unit Test:
@Test
public void testLots() throws Exception {
final Map<Long, Object> ids = new ConcurrentHashMap<>();
Thread t1 = new GenerateRunnable(200, ids);
Thread t2 = new GenerateRunnable(200, ids);
Thread t3 = new GenerateRunnable(200, ids);
Thread t4 = new GenerateRunnable(200, ids);
Thread t5 = new GenerateRunnable(200, ids);
//Run method now just contains...
// for (int i = 0; i < iterations; ++i) {
// Long uid = idService.generateOrderId();
// objects.put(uid, PRESENT);
// System.out.println(uid);
// }
t1.start();
t2.start();
t3.start();
t4.start();
t5.start();
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
assertThat(ids.size(), equalTo(1000));
}
Whilst he run method is using the service at the minute, I have also called directly to the repository, run the query using the SDN neo4j template and directly using the ExecutionEngine - in the later cases manually creating and committing the transactions.
I did start a conversation in the Google Group where Michael kindly helped me with a mistake in the query semantics, but I have brought the conversation over here as I believe that there is an error in my code, or my understanding rather then an issue with Neo4J itself.
If I could read Scala properly I may have stood a better chance of working this through myself, so I'll put that on my learning list, however I note that when debugging I enter a wait state in TransactionBoundQueryContext$NodeOperations.setProperty (by this point having passed through the findNodes component of MergeNodeAction acquiring and releasing locks as it goes and now processing the onMatch.foreach(...)
code MergeNodeAction:86)
I have other mechanisms for generating this value, but this looked like a very neat solution, am I doing something wrong?