I am using Neo4j REST Batch API for the relationship insertion. I do the insertion from multiple instance and sometimes I get BatchOperationFailedException with empty message.
After the investigation I wrote a little program which shows a problem. It seems that rest API doesn't handle the concurrency issues.
So, I have created a console application that starts 10 threads, 5 threads execute first query and the another 5 threads execute second query.
The First query adds the relationship to the node 11 and then to the node 12. The second query adds the relationship to the node 12 and then to the node 11.
And, I got the errors. BUT : if both queries will have the same order (both add the relationship to 11 and then to 12 or both add relationship to 12 and then to 11) Everything works perfectly.
I am getting the errors like this:
{
"message" : "",
"exception" : "BatchOperationFailedException",
"fullname" : "org.neo4j.server.rest.domain.BatchOperationFailedException",
"stacktrace" : [ "org.neo4j.server.rest.batch.NonStreamingBatchOperations.invoke(NonStreamingBatchOperations.java:63)", "org.neo4j.server.rest.batch.BatchOperations.performRequest(BatchOperations.java:188)", "org.neo4j.server.rest.batch.BatchOperations.parseAndPerform(BatchOperations.java:159)", "org.neo4j.server.rest.batch.NonStreamingBatchOperations.performBatchJobs(NonStreamingBatchOperations.java:48)", "org.neo4j.server.rest.web.BatchOperationService.batchProcess(BatchOperationService.java:117)", "org.neo4j.server.rest.web.BatchOperationService.performBatchOperations(BatchOperationService.java:71)", "java.lang.reflect.Method.invoke(Unknown Source)" ]
}
The queries which get errors:
1.
[{"id":1,"method":"POST","to":"/node","body":{"Label":" ","Type":"USER","CreationDate":1369404750,"UserId":"{UserId}"}},
{"id":2,"method":"POST","to":"{1}/relationships","body":{"to":"/node/11","type":"INTERACTS","data":{"Label":"Label1","CreationDate":1258421809}}},
{"id":3,"method":"POST","to":"{1}/relationships","body":{"to":"/node/12","type":"INTERACTS","data":{"Label":"Label2","CreationDate":1258421809}}}]
2.
[{"id":1,"method":"POST","to":"/node","body":{"Label":" ","Type":"USER","CreationDate":1369404750,"UserId":"{UserId}"}},
{"id":2,"method":"POST","to":"{1}/relationships","body":{"to":"/node/12","type":"INTERACTS","data":{"Label":"Label1","CreationDate":1258421809}}},
{"id":3,"method":"POST","to":"{1}/relationships","body":{"to":"/node/11","type":"INTERACTS","data":{"Label":"Label2","CreationDate":1258421809}}}]
And the program:
static string text1 = null;
static string text2 = null;
static void Main(string[] args)
{
StreamReader sr1 = new StreamReader(@"C:\git\PerformanceTest\Perf\1.txt");
StreamReader sr2 = new StreamReader(@"C:\git\PerformanceTest\Perf\2.txt");
text1 = sr1.ReadToEnd();
text2 = sr2.ReadToEnd();
for (int i = 0; i < 5; i++)
{
Thread thread2 = new Thread(InsertUsers1);
thread2.Start();
}
for (int i = 0; i < 5; i++)
{
Thread thread2 = new Thread(InsertUsers2);
thread2.Start();
}
Thread.CurrentThread.Join();
}
private static void InsertUsers1()
{
//while (true)
//{
Random r = new Random((int)DateTime.Now.Ticks);
string newCommand = text1.Replace("{UserId}", r.Next(1000000, 100000000).ToString());
var request = (HttpWebRequest)WebRequest.Create("http://localhost:7474/db/data/batch");
request.Method = "POST";
request.ContentType = "application/json; charset=utf-8";
request.Accept = "application/json";
request.AutomaticDecompression = DecompressionMethods.GZip;
//request.Timeout = _timeout;
try
{
using (var stream = request.GetRequestStream())
{
var bytes = Encoding.UTF8.GetBytes(newCommand);
stream.Write(bytes, 0, bytes.Length);
string queryResult = null;
Console.WriteLine("1");
using (var responseStream = request.GetResponse().GetResponseStream())
{
using (var reader = new StreamReader(responseStream))
{
queryResult = reader.ReadToEnd();
}
}
}
Thread.Sleep(r.Next(5));
}
catch (Exception e)
{
LogException("InsertBatch1", e);
Console.WriteLine("e1");
}
//}
}
private static void InsertUsers2()
{
//while (true)
//{
Random r = new Random((int)DateTime.Now.Ticks);
string newCommand = text2.Replace("{UserId}", "46239");
var request = (HttpWebRequest)WebRequest.Create("http://localhost:7474/db/data/batch");
request.Method = "POST";
request.ContentType = "application/json; charset=utf-8";
request.Accept = "application/json";
request.AutomaticDecompression = DecompressionMethods.GZip;
//request.Timeout = _timeout;
try
{
using (var stream = request.GetRequestStream())
{
var bytes = Encoding.UTF8.GetBytes(newCommand);
stream.Write(bytes, 0, bytes.Length);
string queryResult = null;
//Thread.Sleep(10);
Console.WriteLine("2");
using (var responseStream = request.GetResponse().GetResponseStream())
{
using (var reader = new StreamReader(responseStream))
{
queryResult = reader.ReadToEnd();
}
}
}
Thread.Sleep(r.Next(5));
}
catch (Exception e)
{
LogException("InsertBatch2", e);
Console.WriteLine("e2");
}
//}
}
private static void LogException(string methodName, Exception ex, string request = null)
{
if (ex is WebException)
{
try
{
string neoError = new StreamReader((ex as WebException).Response.GetResponseStream()).ReadToEnd();
Console.WriteLine("Neo4jContext." + methodName + " - Request error " + neoError);
}
catch
{
Console.WriteLine("Neo4jContext." + methodName + " - Request error");
}
}
else
{
Console.WriteLine("Neo4jContext." + methodName + " - Request error");
}
}
Question: does anybody has experienced the same issue? How do you handle this issue? How we can insert relationship without concurrency problem and with a good performance at the same time?
P.S. : In doc about batch insertion (http://docs.neo4j.org/chunked/milestone/batchinsert.html) there is note that batch insertion is not thread safe. But we use REST API (http://docs.neo4j.org/chunked/milestone/rest-api-batch-ops.html) and there is no words about concurrency issues and I assumed that everything should work fine.
P.S. : I have tested it on Neo4j 1.9 and 2.0