0
votes

We are using Apache Beam which is executed on Spark runner. Our Case is the following. Both the 2 use cases causes OutofMemory error.

1) Join - 2 Big Tables using Apache Beam - One table of size 120GB and the other is of 60GB. This causes OutofMemory error when groupByKeyOnly() is called internally in GroupCombineFunctions.java.

2) GroupByKey - We are grouping the dataset based on a key like the following. PCollection>> costBasisRecords = masterDataResult.apply(GroupByKey.create());

This GroupbyKey operation also causes OutOfmemory errors.

Could you please give us suggestions such that we can achieve result.

From online, We saw that reduceByKey method - Could you please guide us how we can implement that functionality for Spark runners.

Error Message:

java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.reflect.Array.newInstance(Array.java:75)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1897)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:171)
at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
2
How do you start the application? What are the memory settings?Jacek Laskowski
We are buiding the project for Spark runner snd using the following spark submit command to run the application../spark-submit --master yarn-cluster --driver-memory 5G --executor-memory 7G --num-executors 20 --executor-core 3.VIjay

2 Answers

0
votes

reduceByKey in Spark is similar to Combine.perKey in Apache Beam, see the Programming Guide for examples.

Note that reduceByKey and Combine.perKey will only work if there is a reduction per key otherwise your just going to hit the same out of memory problem. For example, combining all integers per key into a list will not reduce the amount of memory usage but summing the integers per key will.

0
votes

If possible, I would definitely recommend using a Combine.perKey as Lukasz suggests.

If you are unable to do that or if you still run into OOMs, try to decrease partition size by increasing the number of partitions. You can increase the number of shuffle partitions by manually setting the spark.default.parallelism configuration. This is explicitly used to determine the partitioning scheme for groupByKeyOnly shuffles.

It looks like the way to plumb configurations through is via a manually-constructed SparkContextOptions. There's a test case that shows how to do this. Note that this requires your pipeline program to directly link against Spark. For example:

SparkConf conf = new SparkConf().set("spark.default.parallelism", parallelism);
JavaSparkContext jsc = new JavaSparkContext(conf);
SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
options.setUsesProvidedSparkContext(true);
options.setProvidedSparkContext(jsc);
Pipeline p = Pipeline.create(options);
// ...

NOTE: Spark has its own limitation that all grouped values for a given key must fit in memory on the machine processing that key. If this does not hold for your datasets (i.e., you have very strong key skew), then you will need to combine rather than group by key.