0
votes

i am running logisticregression pipeline and on this line:

model = pipeline.fit(train_data)

i get the following error repeatedly in the RDDLossFunction stage:

File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/ml/base.py", line 132, in fit File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 109, in _fit File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/ml/base.py", line 132, in fit File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 288, in _fit File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 285, in _fit_java File "/usr/spark-2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in call File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/spark-2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o23199.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 9 tasks (3.4 GB) is bigger than spark.driver.maxResultSize (3.0 GB) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124) at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.fold(RDD.scala:1086) at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131) at org.apache.spark.ml.optim.loss.RDDLossFunction.calculate(RDDLossFunction.scala:61) at org.apache.spark.ml.optim.loss.RDDLossFunction.calculate(RDDLossFunction.scala:47) at breeze.optimize.CachedDiffFunction.calculate(CachedDiffFunction.scala:23) at breeze.optimize.FirstOrderMinimizer.calculateObjective(FirstOrderMinimizer.scala:55) at breeze.optimize.FirstOrderMinimizer.initialState(FirstOrderMinimizer.scala:48) at breeze.optimize.FirstOrderMinimizer.iterations(FirstOrderMinimizer.scala:89) at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:798) at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:488) at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:278) at org.apache.spark.ml.Predictor.fit(Predictor.scala:118) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)

i have tried lowering the partition number from 2001 to 400 , as suggested in https://translate.google.co.il/translate?hl=en&sl=zh-CN&u=http://bourneli.github.io/scala/spark/2016/09/21/spark-driver-maxResultSize-puzzle.html&prev=search , but got the same error . tried also increasing the spark.driver.maxResultSize to 3g - no good also .

i have 2 pipelines , one for preparing the data , that is done on the whole dataset , and the second the include just the LogisticRegression & labelconverter (IndexToString) - is the one that is failing .

i am running on a standalone cluster, 3 workers , with 140GB combined, one master with 15GB.

1

1 Answers

0
votes

The error log clearly says Total size of serialized results of 9 tasks (3.4 GB) is bigger than spark.driver.maxResultSize (3.0 GB)

Have you tried to change spark.driver.maxResultSize bigger than 3.4 G?