4
votes

I have experimented with ways to upload medium-size data sets using py2neo. In my case, there are about 80 K nodes and 400 K edges that need to be loaded every day. I want to share my experience, and ask the community if there is still a better way that I have not come across.

A. py2neo's "native" commands.

Create nodes using graph.merge_one() and set properties using push(). I had dismissed this rather quickly as it was very slow and would not even get past 10 K records in several minutes. Not surprisingly, py2neo' documentation and some posts here recommend Cypher instead.

B. Cypher without partitioning

Use py2neo.cypher.CypherTransaction append() in the loop and commit() at the end.

# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query) 
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
# begin new Cypher transaction
tx = neoGraph.cypher.begin()
for row in result:
    tx.append(statement, {"ID": row.id_field})
tx.commit()

This times out and crashes the Neo4j server. I understand the problem is that all 80 K Cypher statements are trying to execute in one go.

C. Cypher with partitioning and one commit

I use a counter and process() command to run 1000 statements at a time.

# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query) 
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
counter = 0
tx = neoGraph.cypher.begin()
for row in result:
    counter += 1
    tx.append(statement, {"ID": row.id_field})
    if (counter == 1000):
        tx.process()    # process 1000 statements
        counter = 0
tx.commit()

This runs quickly in the beginning, but slows down with 1000 of transactions processed. Eventually, it times out in stack overflow. This was surprising as I expected process() to reset stack every time.

D. Cypher with partitioning and commits for each partition

This is the only version that worked well. Do commit() for each partition of 1000 transactions and re-start a new transaction with begin().

# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query) 
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
counter = 0
tx = neoGraph.cypher.begin()
for row in result:
    counter += 1
    tx.append(statement, {"ID": row.id_field})
    if (counter == 1000):
        tx.commit()                   # commit 1000 statements
        tx = neoGraph.cypher.begin()  # reopen transaction
        counter = 0
tx.commit()

This runs quickly and well.

Any comments?

1

1 Answers

1
votes

As you have discovered through trial and error, a single transaction performs best when it has no more than about 10K-50K operations. The method you describe in D works best because you are committing the transaction every 1000 statements. You can probably increase that batch size safely.

Another approach that you might want to try is passing an array of values as a parameter and using Cypher's UNWIND command to iterate over them. For example:

WITH {id_array} AS ids // something like [1,2,3,4,5,6]
UNWIND ids AS ident
MERGE (e:Entity {myid: ident})
SET e.p = 1