0
votes

I am working in a Streamsets pipeline to read data from a active file directory where .csv files are uploaded remotely and put those data in a neo4j database. The steps I have used is-

  • Creating a observation node for each row in .csv
  • Creating a csv node and creating relation between csv & the record
  • Updating Timestamp taken from csv node to burn_in_test nodes, already created in graph database from different pipeline, if it is latest
  • creating relation from csv to burn in test
  • deleting outdated relation based on latest timestamp

Now I am doing all of these using jdbc query and the cypher query used is

MERGE (m:OBSERVATION{
  SerialNumber: "${record:value('/SerialNumber')}",
  Test_Stage: "${record:value('/Test_Stage')}",
  CUR: "${record:value('/CUR')}",
  VOLT: "${record:value('/VOLT')}",
  Rel_Lot: "${record:value('/Rel_Lot')}",
  TimestampINT: "${record:value('/TimestampINT')}",     
  Temp: "${record:value('/Temp')}",
  LP: "${record:value('/LP')}",
  MON: "${record:value('/MON')}"
})       
MERGE (t:CSV{
       SerialNumber: "${record:value('/SerialNumber')}",
       Test_Stage: "${record:value('/Test_Stage')}",
       TimestampINT: "${record:value('/TimestampINT')}"
})  
WITH m
MATCH (t:CSV) where t.SerialNumber=m.SerialNumber and t.Test_Stage=m.Test_Stage and t.TimestampINT=m.TimestampINT MERGE (m)-[:PART_OF]->(t)

WITH t, t.TimestampINT AS TimestampINT
MATCH (rl:Burn_In_Test) where rl.SerialNumber=t.SerialNumber and rl.Test_Stage=t.Test_Stage and rl.TimestampINT<TimestampINT
SET rl.TimestampINT=TimestampINT     
WITH t 
MATCH (rl:Burn_In_Test) where rl.SerialNumber=t.SerialNumber and rl.Test_Stage=t.Test_Stage 
MERGE (t)-[:POINTS_TO]->(rl)
WITH rl
MATCH (t:CSV)-[r:POINTS_TO]->(rl) WHERE t.TimestampINT<rl.TimestampINT
DELETE r

Right now this process is very slow and taking about 15 mins of time for 10 records. Can This be further optimized?

1
Please PROFILE a run of the query and add the query plan (with all elements expanded) to your question.InverseFalcon
I don't think Streamsets has a option to profile a query.Akash Bhattacharyya

1 Answers

1
votes

Best practices when using MERGE is to merge on a single property and then use SET to add other properties.

If I assume that serial number is property is unique for every node (might not be), it would look like:

MERGE (m:OBSERVATION{SerialNumber: "${record:value('/SerialNumber')}"})
SET m.Test_Stage = "${record:value('/Test_Stage')}",
    m.CUR= "${record:value('/CUR')}",
    m.VOLT= "${record:value('/VOLT')}",
    m.Rel_Lot= "${record:value('/Rel_Lot')}",
    m.TimestampINT = "${record:value('/TimestampINT')}",     
    m.Temp= "${record:value('/Temp')}",
    m.LP= "${record:value('/LP')}",
    m.MON= "${record:value('/MON')}"       
MERGE (t:CSV{
       SerialNumber: "${record:value('/SerialNumber')}"       
})
SET t.Test_Stage = "${record:value('/Test_Stage')}",
    t.TimestampINT = "${record:value('/TimestampINT')}"  
WITH m
MATCH (t:CSV) where t.SerialNumber=m.SerialNumber and t.Test_Stage=m.Test_Stage and t.TimestampINT=m.TimestampINT MERGE (m)-[:PART_OF]->(t)

WITH t, t.TimestampINT AS TimestampINT
MATCH (rl:Burn_In_Test) where rl.SerialNumber=t.SerialNumber and rl.Test_Stage=t.Test_Stage and rl.TimestampINT<TimestampINT
SET rl.TimestampINT=TimestampINT     
WITH t 
MATCH (rl:Burn_In_Test) where rl.SerialNumber=t.SerialNumber and rl.Test_Stage=t.Test_Stage 
MERGE (t)-[:POINTS_TO]->(rl)
WITH rl
MATCH (t:CSV)-[r:POINTS_TO]->(rl) WHERE t.TimestampINT<rl.TimestampINT
DELETE r

another thing to add is that I would probably split this into two queries. First one would be the importing part and the second one would be the delete of relationships. Also add unique constraints and indexes where possible.