I am using Neo4j/Cypher , my data is about 200GB , so i thought of scalable solution "spark".
Two solutions are available to make neo4j graphs with spark :
1) Cypher for Apache Spark (CAPS)
2) Neo4j-Spark-Connector
I used the first one ,CAPS . The pre-processed CSV got two "geohash" informations : one for pickup and another for drop off for each row what i want is to make a connected graph of geohash nodes.
CAPS allow only to make a graph by mapping nodes : If node with id 0 is to be connected to node with id 1 you need to have a relationship with start id 0 and end id 1.
A very simple layout would be:
Nodes: (just id, no properties)
id
0
1
2
Relationships: (just the mandatory fields)
id | start | end
0 | 0 | 1
1 | 0 | 2
based on that i ve loaded my CSV into a Spark Dataframe , then i 've splitted the dataframe into :
Pickup dataframe
Drop off data-frame and
Trip data frame
I've generated an id for the two first data-frames, and created a mapping by adding columns to third data-frame and this was the result : A pair of nodes ( pickup-[Trip]->drop off) generated for each mapped rows.
The problem that i got is:
1) the geohash of pickup or a drop off could be repeated for different trips=> i want to merge the creation of nodes
2) a drop off for a trip could be a pickup for another trip so i need to merge this two nodes into one
i tried to change the graph but i was surprised that spark graphs are immutable=>you can't apply cypher queries to change it.
So is there a way to make a connected ,oriented and merged geohash graph with spark ?
This is my code :
package org.opencypher.spark.examples
import org.opencypher.spark.api.CAPSSession
import org.opencypher.spark.api.io.{CAPSNodeTable, CAPSRelationshipTable}
import org.opencypher.spark.util.ConsoleApp
import java.net.URI
import org.opencypher.okapi.api.io.conversion.NodeMapping
import org.opencypher.okapi.api.io.conversion.RelationshipMapping
import org.opencypher.spark.api.io.neo4j.Neo4jPropertyGraphDataSource
import org.opencypher.spark.api.io.neo4j.Neo4jConfig
import org.apache.spark.sql.functions._
import org.opencypher.okapi.api.graph.GraphName
object GreenCabsInputDataFrames extends ConsoleApp {
//1) Create CAPS session and retrieve Spark session
implicit val session: CAPSSession = CAPSSession.local()
val spark = session.sparkSession
//2) Load a csv into dataframe
val df=spark.read.csv("C:\\Users\\Ahmed\\Desktop\\green_result\\green_data.csv").select("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19","_c20","_c21","_c22","_c23")
//3) cache the dataframe
val df1=df.cache()
//4) subset the dataframe
val pickup_dataframe=df1.select("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19","_c20","_c21")
val dropoff_dataframe=df1.select("_c22","_c23")
//5) uncache the dataframe
df1.unpersist()
//6) add id columns to pickup , dropoff and trip dataframes
val pickup_dataframe2= pickup_dataframe.withColumn("id1",monotonically_increasing_id+pickup_dataframe.count()).select("id1",pickup_dataframe.columns:_*)
val dropoff_dataframe2= dropoff_dataframe.withColumn("id2",monotonically_increasing_id+pickup_dataframe2.count()+pickup_dataframe.count()).select("id2",dropoff_dataframe.columns:_*)
//7) create the relationship "trip" is dataframe
val trip_data_dataframe2=pickup_dataframe2.withColumn("idj",monotonically_increasing_id).join(dropoff_dataframe2.withColumn("idj",monotonically_increasing_id),"idj")
//drop unnecessary columns
val pickup_dataframe3=pickup_dataframe2.drop("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19")
val trip_data_dataframe3=trip_data_dataframe2.drop("_c20","_c21","_c22","_c23")
//8) reordering the columns of trip dataframe
val trip_data_dataframe4=trip_data_dataframe3.select("idj", "id1", "id2", "_c0", "_c10", "_c11", "_c12", "_c13", "_c14", "_c15", "_c16", "_c17", "_c18", "_c19", "_c3", "_c4","_c9")
//8.1)displaying dataframes in console
pickup_dataframe3.show()
dropoff_dataframe2.show()
trip_data_dataframe4.show()
//9) mapping the columns
val Pickup_mapping=NodeMapping.withSourceIdKey("id1").withImpliedLabel("HashNode").withPropertyKeys("_c21","_c20")
val Dropoff_mapping=NodeMapping.withSourceIdKey("id2").withImpliedLabel("HashNode").withPropertyKeys("_c23","_c22")
val Trip_mapping=RelationshipMapping.withSourceIdKey("idj").withSourceStartNodeKey("id1").withSourceEndNodeKey("id2").withRelType("TRIP").withPropertyKeys("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19")
//10) create tables
val Pickup_Table2 = CAPSNodeTable(Pickup_mapping, pickup_dataframe3)
val Dropoff_Table = CAPSNodeTable(Dropoff_mapping, dropoff_dataframe2)
val Trip_Table = CAPSRelationshipTable(Trip_mapping,trip_data_dataframe4)
//11) Create graph
val graph = session.readFrom(Pickup_Table2,Dropoff_Table, Trip_Table)
//12) Connect to Neo4j
val boltWriteURI: URI = new URI("bolt://localhost:7687")
val neo4jWriteConfig: Neo4jConfig = new Neo4jConfig(boltWriteURI, "neo4j", Some("wakarimashta"), true)
val neo4jResult: Neo4jPropertyGraphDataSource = new Neo4jPropertyGraphDataSource(neo4jWriteConfig)(session)
//13) Store graph in neo4j
val neo4jResultName: GraphName = new GraphName("neo4jgraphs151")
neo4jResult.store(neo4jResultName, graph)
}