I am working in a Streamsets pipeline to read data from a active file directory where .csv files are uploaded remotely and put those data in a neo4j database. The steps I have used is-
- Creating a observation node for each row in .csv
- Creating a csv node and creating relation between csv & the record
- Updating Timestamp taken from csv node to burn_in_test nodes, already created in graph database from different pipeline, if it is latest
- creating relation from csv to burn in test
- deleting outdated relation based on latest timestamp
Now I am doing all of these using jdbc query and the cypher query used is
MERGE (m:OBSERVATION{
SerialNumber: "${record:value('/SerialNumber')}",
Test_Stage: "${record:value('/Test_Stage')}",
CUR: "${record:value('/CUR')}",
VOLT: "${record:value('/VOLT')}",
Rel_Lot: "${record:value('/Rel_Lot')}",
TimestampINT: "${record:value('/TimestampINT')}",
Temp: "${record:value('/Temp')}",
LP: "${record:value('/LP')}",
MON: "${record:value('/MON')}"
})
MERGE (t:CSV{
SerialNumber: "${record:value('/SerialNumber')}",
Test_Stage: "${record:value('/Test_Stage')}",
TimestampINT: "${record:value('/TimestampINT')}"
})
WITH m
MATCH (t:CSV) where t.SerialNumber=m.SerialNumber and t.Test_Stage=m.Test_Stage and t.TimestampINT=m.TimestampINT MERGE (m)-[:PART_OF]->(t)
WITH t, t.TimestampINT AS TimestampINT
MATCH (rl:Burn_In_Test) where rl.SerialNumber=t.SerialNumber and rl.Test_Stage=t.Test_Stage and rl.TimestampINT<TimestampINT
SET rl.TimestampINT=TimestampINT
WITH t
MATCH (rl:Burn_In_Test) where rl.SerialNumber=t.SerialNumber and rl.Test_Stage=t.Test_Stage
MERGE (t)-[:POINTS_TO]->(rl)
WITH rl
MATCH (t:CSV)-[r:POINTS_TO]->(rl) WHERE t.TimestampINT<rl.TimestampINT
DELETE r
Right now this process is very slow and taking about 15 mins of time for 10 records. Can This be further optimized?