0
votes

I use StatementResult = session.run(Cypher_query) to run cypher query with java. The program in concern is used to import nodes from csv files with LOAD CSV

This error is thrown:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.logging.SimpleFormatter.format(Unknown Source)
at java.util.logging.StreamHandler.publish(Unknown Source)
at java.util.logging.ConsoleHandler.publish(Unknown Source)
at java.util.logging.Logger.log(Unknown Source)
at java.util.logging.Logger.doLog(Unknown Source)
at java.util.logging.Logger.log(Unknown Source)
at org.neo4j.driver.internal.logging.JULogger.warn(JULogger.java:54)
at org.neo4j.driver.internal.security.TLSSocketChannel.close(TLSSocketChannel.java:477)
at org.neo4j.driver.internal.net.SocketClient.stop(SocketClient.java:192)
at org.neo4j.driver.internal.net.SocketConnection.close(SocketConnection.java:260)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.close(ConcurrencyGuardingConnection.java:178)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.dispose(PooledSocketConnection.java:251)
at org.neo4j.driver.internal.net.pooling.PooledConnectionReleaseConsumer.accept(PooledConnectionReleaseConsumer.java:50)
at org.neo4j.driver.internal.net.pooling.PooledConnectionReleaseConsumer.accept(PooledConnectionReleaseConsumer.java:29)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.close(PooledSocketConnection.java:200)
at org.neo4j.driver.internal.NetworkSession.closeCurrentConnection(NetworkSession.java:385)
at org.neo4j.driver.internal.NetworkSession.syncAndCloseCurrentConnection(NetworkSession.java:359)
at org.neo4j.driver.internal.NetworkSession.run(NetworkSession.java:102)
at org.neo4j.driver.internal.NetworkSession.run(NetworkSession.java:93)
at org.neo4j.driver.internal.NetworkSession.run(NetworkSession.java:73)

Is there any way to go around that error? Or is there no way except for importing less nodes?

The whole java code:

import java.io.File;
import java.io.FileNotFoundException;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
import java.util.List;
import java.util.Scanner;
import java.util.logging.Logger;


public class ImporterImprovedVersion {
    private static String path = "file:///C:/csvsToImport/csvs";
    private int num;

    private boolean headerPresent(String filePath) throws FileNotFoundException{
        Scanner scanner = new Scanner(new File(filePath));
        if(scanner.hasNext()){
            String firstLine = scanner.nextLine();
            scanner.close();

            if(filePath.contains("address")){
                System.out.println("address! " + this.num + " " + firstLine.contains("addr_tag_links"));
                return firstLine.contains("addr_tag_links");
            }else if (filePath.contains("transaction")){
                System.out.println("transaction! "  + this.num + " " + firstLine.contains("value_bitcoin"));
                return firstLine.contains("value_bitcoin");
            }else{
                System.out.println("wallet! "  + this.num + " " + firstLine.contains("primAddress,firstSeenTime"));
                return firstLine.contains("primAddress,firstSeenTime");
            }           
        }
        scanner.close();
        return false;

    }



    public ImporterImprovedVersion(int num, boolean withOutputIndex, Session session, String mainDir) throws FileNotFoundException{
        this.num = num;

        //import csvs and create corresponding nodes
        String folderPath = ImporterImprovedVersion.path + num + "/";
        System.out.println(folderPath);


        String queryAddr = null;
        if(this.headerPresent(mainDir + num + "\\addresses.csv")){
            queryAddr = "LOAD CSV From '" + folderPath + "addresses.csv' AS line"
                    + " WITH line"
                    + " SKIP 1";            
        }else{
            queryAddr = "LOAD CSV From '" + folderPath + "addresses.csv' AS line";      
        }
        String queryWallet = null;
        if(this.headerPresent(mainDir + num + "\\wallet.csv")){
            queryWallet = "LOAD CSV From '" + folderPath + "wallet.csv' AS line"
                    + " WITH line"
                    + " SKIP 1";            
        }else{
            queryWallet = "LOAD CSV From '" + folderPath + "wallet.csv' AS line";
        }

        File tranFile = new File(mainDir + num + "\\transactionRelation.csv");
        String queryTran = null;
        System.out.println(tranFile);
        System.out.println(tranFile.exists());
        if(tranFile.exists()){
            if(this.headerPresent(mainDir + num + "\\transactionRelation.csv")){
                queryTran = "LOAD CSV From '" + folderPath + "transactionRelation.csv' AS line"
                        + " WITH line"
                        + " SKIP 1";                    
            }else{
                queryTran = "LOAD CSV From '" + folderPath +  "transactionRelation.csv' AS line";                   
            }
        }else{
            if(this.headerPresent(mainDir + num + "\\transactionRelation1.csv")){
                queryTran = "LOAD CSV From '" + folderPath +  "transactionRelation1.csv' AS line"
                        + " WITH line"
                        + " SKIP 1";                    
            }else{
                queryTran = "LOAD CSV From '" + folderPath + "transactionRelation1.csv' AS line";                   
            }
        }

        StatementResult sR = session.run(queryWallet 
                + " MERGE (w:Wallet { primWallAddr:line[0]})"
                + " ON CREATE SET w.first_seen = line[1], w.last_seen=line[2]");

        System.out.println("Wallet nodes created");

        sR = session.run(queryAddr + " MERGE (a:Address {AddId:(line[0])})"
                + " SET a.addr_tag_link= CASE WHEN a.addr_tag_link = 'null' or a.addr_tag_link IS NULL THEN line[1] ELSE a.addr_tag_link END,"
                + " a.addr_tag= CASE WHEN a.addr_tag = 'null' or a.addr_tag IS NULL THEN line[2] ELSE a.addr_tag END,"
                + " a.first_seen=line[3], a.last_seen=line[4], a.multiExist= a.multiExist OR apoc.convert.toBoolean(line[6])"
                + " WITH a, line[5] as li5" 
                + " MATCH (a), (wa:Wallet) WHERE li5=wa.primWallAddr"
                + " MERGE (a)-[r:BelongTo{uniqueReferenceBelongTo:(a.AddId + wa.primWallAddr)}]->(wa)"
                + " ON CREATE SET r.primWallAddr = wa.primWallAddr,"
                + " wa.first_seen =  CASE WHEN apoc.date.parse(wa.first_seen, 's',\"yyyy-MM-dd'T'HH:mm:ss\") < apoc.date.parse(a.first_seen, 's',\"yyyy-MM-dd'T'HH:mm:ss\") THEN wa.first_seen ELSE a.first_seen END,"
                + " wa.last_seen =  CASE WHEN apoc.date.parse(wa.last_seen, 's',\"yyyy-MM-dd'T'HH:mm:ss\") > apoc.date.parse(a.last_seen, 's',\"yyyy-MM-dd'T'HH:mm:ss\") THEN wa.last_seen ELSE a.last_seen END");

        System.out.println("Address nodes created");
        System.out.println("BelongTo rel created");



        if(withOutputIndex){
            sR = session.run(queryTran 
                    + " MATCH (senderAddress:Address {AddId:line[0]}), (senderAddress)-[:BelongTo]->(sender:Wallet),"
                    + " (receiverAddress:Address {AddId:line[1]}), (receiverAddress)-[:BelongTo]->(receiver:Wallet)"
                    + " MERGE (sender)-[r:SendTo{uniqueReferenceTran:(line[2] + line[8])}]->(receiver)"
                    + " ON CREATE SET r.tranHashString=line[2],r.time=line[3],r.value_bitcoin=line[4],"
                    + "r.value_dollar=line[5],r.type=line[6],r.estChanAddr=line[7],r.outputIndex=line[8]");
        }else{
            System.out.println("withoutOutputIndex");
            sR = session.run(queryTran 
                    + " MATCH (senderAddress:Address {AddId:line[0]}), (senderAddress)-[:BelongTo]->(sender:Wallet),"
                    + " (receiverAddress:Address {AddId:line[1]}), (receiverAddress)-[:BelongTo]->(receiver:Wallet)"
                    + " MERGE (sender)-[r:SendTo{uniqueReferenceTran:(line[2] + 'default')}]->(receiver)"
                    + " ON CREATE SET r.tranHashString=line[2],r.time=line[3],r.value_bitcoin=line[4],"
                    + " r.value_dollar=line[5],r.type=line[6],r.estChanAddr=line[7],r.outputIndex='default'");          
        }

        sR.list();
        System.out.println("Tran rel created");


        String queryAddrDiffWallet = "MATCH  (a:Address)-[:BelongTo]->(w1:Wallet), (a)-[r0:BelongTo]->(w2:Wallet)" 
                + " WHERE w1 <> w2" 
                + " RETURN count(a)";
        sR = session.run(queryAddrDiffWallet);

        List<Record> records = sR.list();


        System.out.println(records.get(0).get("count(a)").asInt());
        System.out.println(records.get(0).get("count(a)").asInt() == 0);
        if(records.get(0).get("count(a)").asInt() != 0){
            sR = session.run("MATCH  (a:Address)-[:BelongTo]->(w1:Wallet), (a)-[r0:BelongTo]->(w2:Wallet)"
                    + " WHERE w1 <> w2" 
                    + " RETURN a");


        }

        String queryMergeWallet = "MATCH  (a:Address)-[:BelongTo]->(w1:Wallet)"
                + " WITH DISTINCT a, min(ID(w1)) as minId" 
                + " MATCH (minW:Wallet)"
                + " WHERE ID(minW) = minId"
                + " SET a.primWallAddr = minW.primWallAddr"
                + " WITH DISTINCT minW, a"
                + " MATCH (a)-[r0:BelongTo]->(w2:Wallet)"
                + " WHERE minW <> w2"

                + " WITH DISTINCT minW, w2, r0"
                + " SET minW.first_seen = CASE WHEN apoc.date.parse(minW.first_seen, 's',\"yyyy-MM-dd'T'HH:mm:ss\") < apoc.date.parse(w2.first_seen, 's',\"yyyy-MM-dd'T'HH:mm:ss\") THEN minW.first_seen ELSE w2.first_seen END,"
                + " minW.last_seen = CASE WHEN apoc.date.parse(minW.last_seen, 's',\"yyyy-MM-dd'T'HH:mm:ss\") > apoc.date.parse(w2.last_seen, 's',\"yyyy-MM-dd'T'HH:mm:ss\") THEN minW.last_seen ELSE w2.last_seen END"
                + " WITH DISTINCT minW, w2, r0"
                + " DELETE r0"
                + " WITH DISTINCT minW, w2"
                + " MATCH (b:Address)-[r:BelongTo]->(w2)"
                + " WITH DISTINCT r, minW, w2, b"  
                + " DELETE r"       
                + " WITH DISTINCT b, minW, w2"
                + " MERGE (b)-[be:BelongTo{uniqueReferenceBelongTo:(b.AddId + minW.primWallAddr)}]->(minW)"
                + " ON CREATE SET be.primWallAddr = minW.primWallAddr"

                + " WITH DISTINCT w2, minW"
                + " MATCH (w3:Wallet)-[r2:SendTo]->(w2)"

                + " MERGE (w3)-[rN:SendTo{uniqueReferenceTran:r2.uniqueReferenceTran}]->(minW)"
                + " ON CREATE SET rN.tranHashString=r2.tranHashString,rN.time=r2.time,"
                + "rN.value_bitcoin=r2.value_bitcoin,rN.value_dollar=r2.value_dollar,"
                + "rN.type=r2.type,rN.estChanAddr=r2.estChanAddr,rN.outputIndex=r2.outputIndex"

                + " WITH DISTINCT w2, minW, r2"
                + " DELETE r2"
                + " WITH DISTINCT w2, minW"
                + " MATCH (w2)-[r1:SendTo]->(w4:Wallet)"
                + " MERGE (minW)-[rN2:SendTo{uniqueReferenceTran:r1.uniqueReferenceTran}]->(w4)"
                + " ON CREATE SET rN2.tranHashString=r1.tranHashString,rN2.time=r1.time,"
                + "rN2.value_bitcoin=r1.value_bitcoin,rN2.value_dollar=r1.value_dollar,"
                + "rN2.type=r1.type,rN2.estChanAddr=r1.estChanAddr,rN2.outputIndex=r1.outputIndex"
                + " WITH DISTINCT r1"
                + " DELETE r1"
                ;
        if(records.get(0).get("count(a)").asInt() != 0){
        //merge any wallet containing same addresses and adjust the transaction as well (w2 merge to w1)
            try{
                sR = session.run(queryMergeWallet);                 
            }catch(Exception e){
                e.printStackTrace();
                try {
                    Thread.sleep(20000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                sR = session.run(queryMergeWallet);                     
            }
        }
        System.out.println("same wallet merged");



        System.out.println("----------abc----------------------" + num);





        //delete all wallets with no addresses  
        sR = session.run(
                " MATCH (n:Wallet)"
                + " WHERE NOT ()-[:BelongTo]->(n)"
                + " DETACH DELETE n"
        );
        System.out.println("Wall with no addr deleted");


    }
    public static void main(String[] args) throws FileNotFoundException{
        Driver driver = GraphDatabase.driver( "bolt://localhost:7687", AuthTokens.basic( "neo4j", "neo7474" ) );
        try(Session session = driver.session()){
            String mainDir = "C:\\Program Files\\neo4j-enterprise-3.1.1\\import\\csvsToImport\\csvs";
            // create index
            session.run( "CREATE CONSTRAINT ON (addr:Address) ASSERT addr.AddId IS UNIQUE");

            session.run( "CREATE CONSTRAINT ON (wa:Wallet) ASSERT wa.primWallAddr IS UNIQUE"); 


            System.out.println("aaa");
            int counter = 537;
            while(counter < 660){
                File dir = new File(mainDir + counter);

                if(dir.exists()){
                    System.out.println("aaa");
                    ImporterImprovedVersion a = new ImporterImprovedVersion(counter, counter > 260, session, mainDir);              
                }
                counter ++;

            }       
        }
    }
}

I imported many rows for nodes and relationships. By the time the exception is thrown, the graph has 184448 relationships, 82179 of them have 8 properties; and 117314 nodes, 69714 of them have 6 properties. I will update my question with this information. (Could the number of nodes and relationships along with their properties be the reason?)

2
You could possibly have a memory leak somewhere in your program, it would be better to show us the code.Jacob G.
which version of the driver do you use?Michael Hunger
how much data is returned here: sR = session.run(queryAddrDiffWallet); List<Record> records = sR.list();Michael Hunger
if you're only interested in the first record you should just get the first record and otherwise call sR.consume() for the restMichael Hunger
Also is there anything missing from the stracktrace? b/c it shows only infrastructure code.Michael Hunger

2 Answers

1
votes

Did you try to increase the heap size in the JVM (-Xmx option)? The CSVs you are trying to import may be too big.

1
votes

LOAD CSV has built in batching possibilities. So whenever you are running out of memory using LOAD CSV use PERIODIC COMMIT. Read more in documentation.

Example:

USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///customers.csv" AS row
CREATE (:Customer {companyName: row.CompanyName})