5
votes

I have pyspark dataframe where its dimension is (28002528,21) and tried to convert it to pandas dataframe by using the following code line :

pd_df=spark_df.toPandas()

I got this error:

first Part

Py4JJavaError: An error occurred while calling o170.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 39.0 failed 1 times, most recent failure: Lost task 3.0 in stage 39.0 (TID 89, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
    at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:552)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:256)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)


Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
        ...
        ...

Caused by: java.lang.OutOfMemoryError: Java heap space
        ...
        ...    

Second Part

Exception happened during processing of request from ('127.0.0.1', 56842)
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:56657)
Traceback (most recent call last):
        ...
        ...    
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:
        ...
        ...

and I tried also to take sample of the original pyspark dataframe

smaple_pd_df=spark_df.sample(0.05).toPandas()

I got an error looks like the first part only of the previous error

2

2 Answers

1
votes

You get java.lang.OutOfMemoryError which probably means that you are trying to load all data into a single node which doesn't have enough RAM to handle the entire DataFrame. If you are using a cloud solution provider such as Databricks, try increasing the size of cluster RAM.

1
votes

What toPandas() does is collect the whole dataframe into a single node (as explained in @ulmefors's answer).

More specifically, it collects it to the driver. The specific option you should be fine-tuning is spark.driver.memory, increase it accordingly.

Otherwise, if you're planning on doing further transformations on this (rather large) pandas dataframe, you could consider doing them in pyspark first and then collecting the (smaller) result into the driver, hopefully that will fit in memory.

More details are available in the Spark configuration documentation, here.