0
votes

I have been trying to execute a scala program and the output somehow always seems to be something like this:

15/08/17 14:13:14 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:64)
at java.lang.StringBuilder.<init>(StringBuilder.java:97)
at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:339)
at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:83)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2344)
at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32)
at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:143)
at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:169)
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)

or like this

15/08/19 11:45:11 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:526)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:505)
    at com.fasterxml.jackson.databind.ObjectMapper._serializerProvider(ObjectMapper.java:2846)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:17)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:17)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:2881)

Are these errors on the driver or executor side?

I am a bit confused with the memory variables that Spark uses. My current settings are

spark-env.sh

export SPARK_WORKER_MEMORY=6G
export SPARK_DRIVER_MEMORY=6G
export SPARK_EXECUTOR_MEMORY=4G

spark-defaults.conf

# spark.driver.memory              6G
# spark.executor.memory            4G
# spark.executor.extraJavaOptions  ' -Xms5G -Xmx5G '
# spark.driver.extraJavaOptions   ' -Xms5G -Xmx5G '

Do I need to uncomment any of the variables contained in spark-defaults.conf, or are they redundant?

Is for example setting SPARK_WORKER_MEMORY equivalent to setting the spark.executor.memory?

Part of my scala code where it stops after a few iterations:

   val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct.collect
    for (id <- filteredNodesGroups){
        val clusterGraph = connCompGraph.subgraph(vpred = (_, attr) => attr(pagerankIndex) == id)
        val pagerankGraph = clusterGraph.pageRank(0.15)
        val completeClusterPagerankGraph = clusterGraph.outerJoinVertices(pagerankGraph.vertices) {
            case (uid, attrList, Some(pr)) => 
                attrList :+ ("inClusterPagerank:" + pr)
            case (uid, attrList, None) => 
                attrList :+ ""
        }
        val sortedClusterNodes = completeClusterPagerankGraph.vertices.toArray.sortBy(_._2(pagerankIndex + 1))
       println(sortedClusterNodes(0)._2(1) + " with rank: " + sortedClusterNodes(0)._2(pagerankIndex + 1))

     }        

Many questions disguised as one. Thank you in advance!

1
It really depends of what you are trying to achieve, could you provide some code to illustrate ? - Francis Toth
Just to add that the master often dies after the failed execution. - sofia
I updated the original post to include the code - sofia
In most cases, it is probable to optimize your code to boost performance and avoid memory issues with JVM. Could you roughly specify the size of your data, and why you are using that for loop? - GameOfThrows
I can't see how I can optimise this more. What I am trying to do is break a graph into smaller graphs and calculate the page rank for each of the subgraphs. - sofia

1 Answers

0
votes

I'm not a Spark expert, but there is line that seems suspicious to me :

val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct.collect

Basically, by using the collect method, you are getting back all the data from your executors (before even processing it) to the driver. Do you have any idea about the size of this data ?

In order to fix this, you should proceed in a more functional way. To extract the distinct values, you could for example use a groupBy and map :

val pairs = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }
pairs.groupBy(_./* the property to group on */)
     .map { case (_, arrays) => /* map function */ }

Regarding the collect, there should be a way to sort each partition and then to return the (processed) result to the driver. I would like to help you more but I need more information about what you are trying to do.

UPDATE

After digging a little bit, you could sort your data using shuffling as described here

UPDATE

So far, I've tried to avoid the collect, and to get the data back to the driver as much as possible, but I've no idea how to solve this :

val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct()
val clusterGraphs = filteredNodesGroups.map { id => connCompGraph.subgraph(vpred = (_, attr) => attr(pagerankIndex) == id) }
val pageRankGraphs = clusterGraphs.map(_.pageRank(0.15))

Basically, you need to join two RDD[Graph[Array[String], String]], but I don't know what key to use, and secondly this would necessarily return an RDD of RDD (I don't know if you can even do that). I'll try to find something later this day.