1
votes

I have a graph in which some nodes are having millions of incoming edges. I need to obtain the edge count of such nodes periodically. I'm using cassandar as storage backend. Query :

g.V().has('vid','qwerty').inE().count().next()

All the documentation available explains how to leverage apache spark to do it from gremlin console. Would it be possible for me to somehow write the logic outside gremlin console as a spark job and run id periodically on a hadoop cluster.

Here's the output of the query on gremlin console when i'm not using spark:

14108889 [gremlin-server-session-1] WARN org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor - Exception processing a script on request [RequestMessage{, requestId=c3d902b7-0fdd-491d-8639-546963212474, op='eval', processor='session', args={gremlin=g.V().has('vid','qwerty').inE().count().next(), session=2831d264-4566-4d15-99c5-d9bbb202b1f8, bindings={}, manageTransaction=false, batchSize=64}}]. TimedOutException() at org.apache.cassandra.thrift.Cassandra$multiget_slice_result$multiget_slice_resultStandardScheme.read(Cassandra.java:14696) at org.apache.cassandra.thrift.Cassandra$multiget_slice_result$multiget_slice_resultStandardScheme.read(Cassandra.java:14633) at org.apache.cassandra.thrift.Cassandra$multiget_slice_result.read(Cassandra.java:14559) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) at org.apache.cassandra.thrift.Cassandra$Client.recv_multiget_slice(Cassandra.java:741) at org.apache.cassandra.thrift.Cassandra$Client.multiget_slice(Cassandra.java:725) at org.janusgraph.diskstorage.cassandra.thrift.CassandraThriftKeyColumnValueStore.getNamesSlice(CassandraThriftKeyColumnValueStore.java:143) at org.janusgraph.diskstorage.cassandra.thrift.CassandraThriftKeyColumnValueStore.getSlice(CassandraThriftKeyColumnValueStore.java:100) at org.janusgraph.diskstorage.keycolumnvalue.KCVSProxy.getSlice(KCVSProxy.java:82) at org.janusgraph.diskstorage.keycolumnvalue.cache.ExpirationKCVSCache.getSlice(ExpirationKCVSCache.java:129) at org.janusgraph.diskstorage.BackendTransaction$2.call(BackendTransaction.java:288) at org.janusgraph.diskstorage.BackendTransaction$2.call(BackendTransaction.java:285) at org.janusgraph.diskstorage.util.BackendOperation.executeDirect(BackendOperation.java:69) at org.janusgraph.diskstorage.util.BackendOperation.execute(BackendOperation.java:55) at org.janusgraph.diskstorage.BackendTransaction.executeRead(BackendTransaction.java:470) at org.janusgraph.diskstorage.BackendTransaction.edgeStoreMultiQuery(BackendTransaction.java:285) at org.janusgraph.graphdb.database.StandardJanusGraph.edgeMultiQuery(StandardJanusGraph.java:441) at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.lambda$executeMultiQuery$3(StandardJanusGraphTx.java:1054) at org.janusgraph.graphdb.query.profile.QueryProfiler.profile(QueryProfiler.java:98) at org.janusgraph.graphdb.query.profile.QueryProfiler.profile(QueryProfiler.java:90) at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.executeMultiQuery(StandardJanusGraphTx.java:1054) at org.janusgraph.graphdb.query.vertex.MultiVertexCentricQueryBuilder.execute(MultiVertexCentricQueryBuilder.java:113) at org.janusgraph.graphdb.query.vertex.MultiVertexCentricQueryBuilder.edges(MultiVertexCentricQueryBuilder.java:133) at org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphVertexStep.initialize(JanusGraphVertexStep.java:95) at org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphVertexStep.processNextStart(JanusGraphVertexStep.java:101) at org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.hasNext(AbstractStep.java:143) at org.apache.tinkerpop.gremlin.process.traversal.step.util.ExpandableStepIterator.hasNext(ExpandableStepIterator.java:42) at org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep.processAllStarts(ReducingBarrierStep.java:83) at org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep.processNextStart(ReducingBarrierStep.java:113) at org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.next(AbstractStep.java:128) at org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.next(AbstractStep.java:38) at org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal.next(DefaultTraversal.java:200) at java_util_Iterator$next.call(Unknown Source) at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117) at Script13.run(Script13.groovy:1) at org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine.eval(GremlinGroovyScriptEngine.java:843) at org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine.eval(GremlinGroovyScriptEngine.java:548) at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:233) at org.apache.tinkerpop.gremlin.groovy.engine.ScriptEngines.eval(ScriptEngines.java:120) at org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor.lambda$eval$0(GremlinExecutor.java:290) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

However g.V().has('vid','qwerty').inE().limit(10000).count().next() works fine and gives ==>10000

1

1 Answers

1
votes

Here is the Java client, which is using SparkGraphComputer for creating the graph:

public class FollowCountSpark {

    private static Graph hgraph;
    private static GraphTraversalSource traversalSource;

    public static void main(String[] args) {
        createHGraph();
        System.exit(0);
    }

    private static void createHGraph() {
        hgraph = GraphFactory.open("/resources/jp_spark.properties");

        traversalSource = hgraph.traversal().withComputer(SparkGraphComputer.class);
        System.out.println("traversalSource = "+traversalSource);
        getAllEdgesFromHGraph();
    }

    static long getAllEdgesFromHGraph(){
        try{
            GraphTraversal<Vertex, Vertex> allV = traversalSource.V();
            GraphTraversal<Vertex, Vertex> gt = allV.has("vid", "supernode");
            GraphTraversal<Vertex, Long> c = gt.inE()
//                    .limit(600000)
                    .count();
            long l = c.next();
            System.out.println("All edges = "+l);
            return l;
        }catch (Exception e) {
            System.out.println("Error while fetching the edges for : ");
            e.printStackTrace();
        }
        return -1;
    }
}

And the corresponding properties file is:

storage.backend=cassandrathrift
storage.cassandra.keyspace=t_graph

cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.5
ids.block-size = 100000
storage.batch-loading = true
storage.buffer-size = 1000

# read-cassandra-3.properties
#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cassandra.Cassandra3InputFormat
gremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat

gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output

#
# JanusGraph Cassandra InputFormat configuration
#
# These properties defines the connection properties which were used while write data to JanusGraph.
janusgraphmr.ioformat.conf.storage.backend=cassandrathrift
# This specifies the hostname & port for Cassandra data store.
#janusgraphmr.ioformat.conf.storage.hostname=10.xx.xx.xx,xx.xx.xx.18,xx.xx.xx.141
janusgraphmr.ioformat.conf.storage.port=9160
# This specifies the keyspace where data is stored.
janusgraphmr.ioformat.conf.storage.cassandra.keyspace=t_graph

#
# Apache Cassandra InputFormat configuration
#
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
spark.cassandra.input.split.size=256

#
# SparkGraphComputer Configuration
#
spark.master=local[1]
spark.executor.memory=1g
spark.cassandra.input.split.size_in_mb=512
spark.executor.extraClassPath=/opt/lib/janusgraph/*
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator

And the corresponding pom.xml dependencies for all the spark and hadoop specific classes:

<dependencies>
        <dependency>
            <groupId>org.janusgraph</groupId>
            <artifactId>janusgraph-core</artifactId>
            <version>${janusgraph.version}</version>
        </dependency>
        <dependency>
            <groupId>org.janusgraph</groupId>
            <artifactId>janusgraph-cassandra</artifactId>
            <version>${janusgraph.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.tinkerpop</groupId>
            <artifactId>spark-gremlin</artifactId>
            <version>3.1.0-incubating</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.tinkerpop</groupId>
            <artifactId>spark-gremlin</artifactId>
            <version>3.2.5</version>
            <exclusions>
               <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.janusgraph</groupId>
            <artifactId>janusgraph-hadoop-core</artifactId>
            <version>${janusgraph.version}</version>
        </dependency>
        <dependency>
            <groupId>org.janusgraph</groupId>
            <artifactId>janusgraph-hbase</artifactId>
            <version>${janusgraph.version}</version>
        </dependency>

        <dependency>
            <groupId>org.janusgraph</groupId>
            <artifactId>janusgraph-cql</artifactId>
            <version>${janusgraph.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.8.1</version>
        </dependency>

    </dependencies>

Hope this helps :)