0
votes

When using tensorflow java for inference the amount of memory to make the job run on YARN is abnormally large. The job run perfectly with spark on my computer (2 cores 16Gb of RAM) and take 35 minutes to complete. But when I try to run it on YARN with 10 executors 16Gb memory and 16 Gb memoryOverhead the executors are killed for using too much memory.

Prediction Run on an Hortonworks cluster with YARN 2.7.3 and Spark 2.2.1. Previously we used DL4J to do inference and everything run under 3 min. Tensor are correctly closed after usage and we use a mapPartition to do prediction. Each task contain approximately 20.000 records (1Mb) so this will make input tensor of 2.000.000x14 and output tensor of 2.000.000 (5Mb).

option passed to spark when running on YARN

--master yarn --deploy-mode cluster --driver-memory 16G --num-executors 10 --executor-memory 16G --executor-cores 2 --conf spark.driver.memoryOverhead=16G --conf spark.yarn.executor.memoryOverhead=16G --conf spark.sql.shuffle.partitions=200 --conf spark.tasks.cpu=2

This configuration may work if we set spark.sql.shuffle.partitions=2000 but it take 3 hours

UPDATE:

The difference between local and cluster was in fact due to a missing filter. we actually run the prediction on more data than we though.

1

1 Answers

0
votes

To reduce memory footprint of each partition you must create batch inside each partition (use grouped(batchSize)). Thus you are faster than running predict for each row and you allocate tensor of predermined size (batchSize). If you investigate the code of tensorflowOnSpark scala inference this is what they did. Below you will find a reworked example of an implementation this code may not compile but you get the idea of how to do it.

    lazy val sess = SavedModelBundle.load(modelPath, "serve").session
    lazy val numberOfFeatures = 1
    lazy val laggedFeatures = Seq("cost_day1", "cost_day2", "cost_day3")
    lazy val numberOfOutputs = 1
    val predictionsRDD = preprocessedData.rdd.mapPartitions { partition =>
        partition.grouped(batchSize).flatMap { batchPreprocessed =>
          val numberOfLines = batchPreprocessed.size
          val featuresShape: Array[Long] = Array(numberOfLines, laggedFeatures.size / numberOfFeatures, numberOfFeatures)

          val featuresBuffer: FloatBuffer = FloatBuffer.allocate(numberOfLines)

          for (
            featuresWithKey <- batchPreprocessed;
            feature <- featuresWithKey.features
          ) {
            featuresBuffer.put(feature)
          }
          featuresBuffer.flip()
          val featuresTensor = Tensor.create(featuresShape, featuresBuffer)

          val results: Tensor[_] = sess.runner
            .feed("cost", featuresTensor)
            .fetch("prediction")
            .run.get(0)

          val output = Array.ofDim[Float](results.numElements(), numberOfOutputs)
          val outputArray: Array[Array[Float]] = results.copyTo(output)

          results.close()
          featuresTensor.close()
          outputArray
        }
    }
    spark.createDataFrame(predictionsRDD)

We use FloatBuffer and Shape to create Tensor as recommended in this issue