5
votes

I have created a GraphFrame in Spark and the graph currently looks as following:

Basically, there will be lot of such subgraphs where each of these subgraphs will be disconnected to each other. Given a particular node ID I want to find all the other nodes within the subgraph. For instance, if the node ID 1 is given then the graph will traverse and return 2,10,20,3,30.

I have created a motif but it doesn't give the right result.

testgraph.find("(a)-[]->(b); (c)-[]->(b)").filter("(a.id = '1')").show()

Unfortunately the connected component function consider the whole graph. Is it possible to get all the nodes within a disconnected subgraph given a particular node ID using GraphFrame/GraphX?

1
But what's the issue with working on the whole graph? The way the connectedComponents algorithm works, it makes it very easy to find the disconnected subgraphs within the original graph. Basically, everything in the same subgraph gets its Vertex.attr set to the smallest VertexID in the subgraph. At that point, finding all the connected components to a given node is as easy finding all the vertices in the connectedComponents result graph that have the same vertex.attr value. - David Griffin
For example, in your graph pictured above, after running connectedComponents, every vertex in the left-hand subgraph would have its attr set to 1L. Every vertex in the right-hand subgraph would have its attr set to 4L. Then you can just use simple RDD operations to filter out the nodes that are in the correct subgraph. - David Griffin
Hi @DavidGriffin I know that. However, running connectedComponents is really slow when you have billion of nodes and you need to find just three or four of such subgraphs. In such cases, computing connectedComponents is really costly. - sjishan
In generally it won't be cheaper than connectedComponents. That being said it works just fine for me. Are you sure you didn't mess the types for in filter expression? - zero323
Hi @zero323 Can you explain why you think it won't be cheaper? My understanding is that GraphFrame is based on DataFrame when I want to traverse based on one particular Node ID, It will first search it from the Nodes DataFrame and then traverse to find the other nodes associated with it. Unlike a normal graph search, where may need to start randomly and to keep traversing until you find the Node ID and then you traverse to get the other nodes associated with it. - sjishan

1 Answers

0
votes

Getting the connected component related to a specific vertex can be done using a BFS traversal that starts from this vertex and collects all its neighbors on several hops. This can be simply done through the Pregel API offered by GraphX, where we should implement a vertexProgram, sendMessage and mergeMessages functions. The algorithm is triggered on the reception of an initial message. The center sends a message to its neighbors that will propagate it to their neighbors and so on till covering the connected component. Every vertex that receives a msg is checked so that it won't be activated in the following iterations.

Here is the implementation of this approach:

import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}

object ConnectedComponent extends  Serializable {

    def main(args = Array[String]) = {
        
        val conf = new SparkConf().setAppName("ConnectedComponent").setMaster("local")
        val sc = new SparkContext(conf)
        val vRDD = sc.objectFile[(VertexId,Int)]("/path/to/vertex/rdd/file/")
        val eRDD = sc.objectFile[Edge[Int]]("/path/to/edge/rdd/file/")
        val graph = Graph(vRDD, eRDD)
        val centerOfCC = graph.pickRandomVertex()
        var cc = extractCC(graph, center)
        cc.vertices.collect.foreach(println)

        sc.stop()
    }

    def extractCC(g: Graph[Int, Int], center: VertexId): Graph[Int, Int] = {
        /* Return a subgraph of the input graph containing 'center'  with the connected component
         */
        val initialGraph = g.mapVertices((id, attr) => VertexData(attr, false, false, center))
        val connectedComponent = initialGraph.pregel(initialMsg = 0)(vprog, sendMsg, mergeMsgs)
                                .subgraph(vpred = (id, attr) => attr.checked == true)
                                .mapVertices((id, vdata) => vdata.attr)
        connectedComponent
    }


    case class VertexData( var attr : Int, // label of the vertex
                    var checked : Boolean, // check visited vertices 
                    var propagate : Boolean, // allow forwarding msgs or not
                    var center: VertexId) // ID of the connectedComponent center
    def vprog(id:VertexId, vdata: VertexData, msg: Int): VertexData = {

        val attr : Int = vdata.attr 
        var checked : Boolean = vdata.checked
        var propagate : Boolean = vdata.propagate
        val center : VertexId = vdata.center

        if (checked==false && msg == 0 && id==center) {
          propagate = true
          checked = true
        }
        else if(checked==false && msg == 1) {
          propagate = true
          checked = true
        }
        else if(checked == true && msg == 1){
          propagate = false
        }
        new VertexData(attr, checked, propagate, center)
    }

    def sendMsg(triplet: EdgeTriplet[VertexData, Int]):Iterator[(VertexId, Int)] = {
        var it : Iterator[(VertexId, Int)] = Iterator()
        if(triplet.dstAttr.propagate==true)
          it = it ++ Iterator((triplet.srcId, 1))
        if(triplet.srcAttr.propagate==true)
          it = it ++ Iterator((triplet.dstId, 1))
        it
    }

    def mergeMsgs(a: Int, b: Int): Int = math.max(a, b)
}