1
votes

I am using Neo4j/Cypher , my data is about 200GB , so i thought of scalable solution "spark".

Two solutions are available to make neo4j graphs with spark :

1) Cypher for Apache Spark (CAPS)

2) Neo4j-Spark-Connector

I used the first one ,CAPS . The pre-processed CSV got two "geohash" informations : one for pickup and another for drop off for each row pre-processed data what i want is to make a connected graph of geohash nodes.

CAPS allow only to make a graph by mapping nodes : If node with id 0 is to be connected to node with id 1 you need to have a relationship with start id 0 and end id 1.

A very simple layout would be:

Nodes: (just id, no properties)

id
0
1
2

Relationships: (just the mandatory fields)

id | start | end
0  | 0     | 1
1  | 0     | 2

based on that i ve loaded my CSV into a Spark Dataframe , then i 've splitted the dataframe into :

  • Pickup dataframe

  • Drop off data-frame and

  • Trip data frame

I've generated an id for the two first data-frames, and created a mapping by adding columns to third data-frame and this was the result : pair graph A pair of nodes ( pickup-[Trip]->drop off) generated for each mapped rows.

The problem that i got is:

1) the geohash of pickup or a drop off could be repeated for different trips=> i want to merge the creation of nodes

2) a drop off for a trip could be a pickup for another trip so i need to merge this two nodes into one

i tried to change the graph but i was surprised that spark graphs are immutable=>you can't apply cypher queries to change it.

So is there a way to make a connected ,oriented and merged geohash graph with spark ?

This is my code :

package org.opencypher.spark.examples

import org.opencypher.spark.api.CAPSSession
import org.opencypher.spark.api.io.{CAPSNodeTable, CAPSRelationshipTable}
import org.opencypher.spark.util.ConsoleApp
import java.net.URI
import org.opencypher.okapi.api.io.conversion.NodeMapping
import org.opencypher.okapi.api.io.conversion.RelationshipMapping
import org.opencypher.spark.api.io.neo4j.Neo4jPropertyGraphDataSource
import org.opencypher.spark.api.io.neo4j.Neo4jConfig
import org.apache.spark.sql.functions._
import org.opencypher.okapi.api.graph.GraphName

object GreenCabsInputDataFrames extends ConsoleApp {

 //1) Create CAPS session and retrieve Spark session
    implicit val session: CAPSSession = CAPSSession.local()
    val spark = session.sparkSession

 //2) Load a csv into dataframe
 val df=spark.read.csv("C:\\Users\\Ahmed\\Desktop\\green_result\\green_data.csv").select("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19","_c20","_c21","_c22","_c23")

//3) cache the dataframe
 val df1=df.cache()

 //4) subset the dataframe
 val pickup_dataframe=df1.select("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19","_c20","_c21")
 val dropoff_dataframe=df1.select("_c22","_c23")

 //5) uncache the dataframe
 df1.unpersist()

 //6) add id columns to pickup , dropoff and trip dataframes
 val pickup_dataframe2= pickup_dataframe.withColumn("id1",monotonically_increasing_id+pickup_dataframe.count()).select("id1",pickup_dataframe.columns:_*)
 val dropoff_dataframe2= dropoff_dataframe.withColumn("id2",monotonically_increasing_id+pickup_dataframe2.count()+pickup_dataframe.count()).select("id2",dropoff_dataframe.columns:_*)
 //7) create the relationship "trip" is dataframe
 val trip_data_dataframe2=pickup_dataframe2.withColumn("idj",monotonically_increasing_id).join(dropoff_dataframe2.withColumn("idj",monotonically_increasing_id),"idj")

 //drop unnecessary columns
 val  pickup_dataframe3=pickup_dataframe2.drop("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19")
 val trip_data_dataframe3=trip_data_dataframe2.drop("_c20","_c21","_c22","_c23")

  //8) reordering the columns of trip dataframe

 val trip_data_dataframe4=trip_data_dataframe3.select("idj", "id1", "id2", "_c0", "_c10", "_c11", "_c12", "_c13", "_c14", "_c15", "_c16", "_c17", "_c18", "_c19", "_c3", "_c4","_c9")

  //8.1)displaying dataframes in console
   pickup_dataframe3.show()
   dropoff_dataframe2.show()
   trip_data_dataframe4.show()
 //9) mapping the columns
 val Pickup_mapping=NodeMapping.withSourceIdKey("id1").withImpliedLabel("HashNode").withPropertyKeys("_c21","_c20")
 val Dropoff_mapping=NodeMapping.withSourceIdKey("id2").withImpliedLabel("HashNode").withPropertyKeys("_c23","_c22")
 val Trip_mapping=RelationshipMapping.withSourceIdKey("idj").withSourceStartNodeKey("id1").withSourceEndNodeKey("id2").withRelType("TRIP").withPropertyKeys("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19")

 //10)  create tables
val Pickup_Table2 = CAPSNodeTable(Pickup_mapping, pickup_dataframe3)
val Dropoff_Table = CAPSNodeTable(Dropoff_mapping, dropoff_dataframe2)
val Trip_Table = CAPSRelationshipTable(Trip_mapping,trip_data_dataframe4)

//11) Create graph
val graph = session.readFrom(Pickup_Table2,Dropoff_Table, Trip_Table)

//12)  Connect to Neo4j
val boltWriteURI: URI = new URI("bolt://localhost:7687")
val neo4jWriteConfig: Neo4jConfig = new Neo4jConfig(boltWriteURI, "neo4j", Some("wakarimashta"), true)
val neo4jResult: Neo4jPropertyGraphDataSource = new Neo4jPropertyGraphDataSource(neo4jWriteConfig)(session)


 //13) Store graph in neo4j
 val neo4jResultName: GraphName = new GraphName("neo4jgraphs151")
 neo4jResult.store(neo4jResultName, graph)
 }
1
Could you please paste your import script?Ira Re
I've used spark 's capability to import CSV files , anyway i added the full commented code to the question , to make it more preciousA.HADDAD

1 Answers

2
votes

You are right, CAPS is, just like Spark, an immutable system. However, with CAPS you can create new graphs from within a Cypher statement: https://github.com/opencypher/cypher-for-apache-spark/blob/master/spark-cypher-examples/src/main/scala/org/opencypher/spark/examples/MultipleGraphExample.scala

At the moment the CONSTRUCT clause has limited support for MERGE. It only allows to add already bound nodes to the newly created graph, while each bound node is added exactly once independent off how many time it occurs in the binding table.

Consider the following query:

MATCH (n), (m)
CONSTRUCT
  CREATE (n), (m)
RETURN GRAPH

The resulting graph will have as many nodes as the input graph.

To solve your problem you could use two approaches: a) already deduplicate before creating the graph, b) using Cypher queries. Approach b) would look like:

// assuming that graph is the graph created at step 11
session.catalog.store("inputGraph", graph)

session.cypher("""
  CATALOG CREATE GRAPH temp {
    FROM GRAPH session.inputGraph
    MATCH (n)
    WITH DISTINCT n.a AS a, n.b as b
    CONSTRUCT 
      CREATE (:HashNode {a: a, b as b})
    RETURN GRAPH
  }
""".stripMargin)

val mergeGraph = session.cypher("""
  FROM GRAPH inputGraph
  MATCH (from)-[via]->(to)
  FROM GRAPH temp
  MATCH (n), (m)
  WHERE from.a = n.a AND from.b = n.b AND to.a = m.a AND to.b = m.b
  CONSTRUCT 
    CREATE (n)-[COPY OF via]->(m)
  RETURN GRAPH
""".stripMargin).graph

Note: Use the property names for bot pickup and dropoff nodes (e.g. a and b)