I have the following code to converts the I read the data from my input files and create a pairedrdd, which is then converted to a Map for future lookups. I then map this broadcast variable. This is the map that is few GB. Is there a way to do collectAsMap() in a more efficient manner or to replace it with some other call?
val result_paired_rdd = prods_user_flattened.collectAsMap()
sc.broadcast(result_paired_rdd)
I get the following error. I also tried the following param: --executor-memory 7G with spark-submit command.
15/08/31 08:29:51 INFO BlockManagerInfo: Removed taskresult_48 on host3:48924 in memory (size: 11.4 MB, free: 3.6 GB)
15/08/31 08:29:51 INFO BlockManagerInfo: Added taskresult_50 in memory on host3:48924 (size: 11.6 MB, free: 3.6 GB)
15/08/31 08:29:52 INFO BlockManagerInfo: Added taskresult_51 in memory on host2:60182 (size: 11.6 MB, free: 3.6 GB)
15/08/31 08:30:02 ERROR Utils: Uncaught exception in thread task-result-getter-0
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.<init>(String.java:203)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)