1
votes

I have the following code to converts the I read the data from my input files and create a pairedrdd, which is then converted to a Map for future lookups. I then map this broadcast variable. This is the map that is few GB. Is there a way to do collectAsMap() in a more efficient manner or to replace it with some other call?

val result_paired_rdd = prods_user_flattened.collectAsMap() 

sc.broadcast(result_paired_rdd)

I get the following error. I also tried the following param: --executor-memory 7G with spark-submit command.

15/08/31 08:29:51 INFO BlockManagerInfo: Removed taskresult_48 on host3:48924 in memory (size: 11.4 MB, free: 3.6 GB)
15/08/31 08:29:51 INFO BlockManagerInfo: Added taskresult_50 in memory on host3:48924 (size: 11.6 MB, free: 3.6 GB)
15/08/31 08:29:52 INFO BlockManagerInfo: Added taskresult_51 in memory on host2:60182 (size: 11.6 MB, free: 3.6 GB)
15/08/31 08:30:02 ERROR Utils: Uncaught exception in thread task-result-getter-0
java.lang.OutOfMemoryError: GC overhead limit exceeded
            at java.util.Arrays.copyOfRange(Arrays.java:2694)
            at java.lang.String.<init>(String.java:203)
            at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
            at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
            at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
            at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
            at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
            at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
            at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
            at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
            at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
            at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
            at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
            at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
            at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
            at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
            at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
            at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
            at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
            at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
            at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
1
All this means is you're out of memory. Not sure how you expect anyone to help without any details of what your application is doing or any details about where the memory is being consumed,. - The Archetypal Paul

1 Answers

2
votes

From the logs it looks like the driver is running out of memory.

For certain actions like collect, rdd data from all workers is transferred to the driver JVM.

  1. Check your driver JVM settings
  2. Avoid collecting so much data onto driver JVM