2
votes

I am attempting to perform Spark MLLib PCA (using Scala) on a RowMatrix with 2168 columns, and a large number of rows. However, I have observed that even with as few as 2 rows in the matrix (a 112KB text file), the following error is always produced, at the same job step:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space 
        at breeze.linalg.svd$.breeze$linalg$svd$$doSVD_Double(svd.scala:92) 
        at breeze.linalg.svd$Svd_DM_Impl$.apply(svd.scala:39) 
        at breeze.linalg.svd$Svd_DM_Impl$.apply(svd.scala:38) 
        at breeze.generic.UFunc$class.apply(UFunc.scala:48) 
        at breeze.linalg.svd$.apply(svd.scala:22) 
        at org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponents(RowMatrix.scala:380) 
        at SimpleApp$.main(scala-pca.scala:17) 
        at SimpleApp.main(scala-pca.scala) 
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
        at java.lang.reflect.Method.invoke(Method.java:601) 
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) 
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) 
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) 
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) 
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I have also observed that this error is remedied by using 1100 columns or fewer, regardless of the number of rows in the RowMatrix.

I am running Spark 1.3.0 standalone across 21 nodes, with 12 workers and 20G memory per node. I am submitting the job via spark-submit with --driver-memory 6g and --conf spark.executor.memory=1700m. In spark-env.sh the following options are set:

SPARK_WORKER_MEMORY=1700M
SPARK_WORKER_CORES=1
SPARK_WORKER_INSTANCES=12

Here is the code I am submitting:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.{Vector, Vectors}

object SimpleApp {
  def main(args: Array[String]) {
    val datafilePattern = "/path/to/data/files*.txt"
    val conf = new SparkConf().setAppName("pca_analysis").setMaster("master-host")
    val sc = new SparkContext(conf)
    val lData = sc.textFile(datafilePattern).cache()

    val vecData = lData.map(line => line.split(" ").map(v => v.toDouble)).map(arr => Vectors.dense(arr))
    val rmat: RowMatrix = new RowMatrix(vecData)
    val pc: Matrix = rmat.computePrincipalComponents(15)
    val projected: RowMatrix = rmat.multiply(pc)

    println("Finished projecting rows.")
  }
}

Has anyone else experienced this problem with the computePrincipalComponents() method? Any help is much appreciated.

1
Try to increase parallelism: val lData = sc.textFile(dataFilePatter, 30)pzecevic
@pzecevic Increasing the parallelism did not help, the same error occurred at the same point in execution. I tried various amounts of parallelism between 0-15000.zs3404635
Another thing is that you don't need that cache() because you use lData only once (just saying; this probably will not solve your problem)pzecevic
Second thing: you have 20GB available per node, but you are requesting 12 * (1,7GB + JVM overhead which can be large) + OS memory. With that setup you need more like 30GB per node. Did you try setting SPARK_WORKER_INSTANCES to a lower value?pzecevic

1 Answers

0
votes

I just ran into this issue and the fix for this is to increase the --driver-memory to may be 2G or more if needed.