2
votes

Firstly, I join two dataframes, the first DF is filtered from second DF and is about 8MB (260 000 records), second DF is from file that is cca 2GB (37 000 000 records). Then I call

joinedDF.javaRDD().saveAsTextFile("hdfs://xxx:9000/users/root/result");

and I tried also

joinedDF.write().mode(SaveMode.Overwrite).json("hdfs://xxx:9000/users/root/result");

I am bit confused since I get an exception

ERROR TaskSetManager: Total size of serialized results of 54 tasks (1034.6 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

As I know, saveAsTextFile should outputs directly from workers. So why I get exception related to the driver? I know about the option to increase spark.driver.maxResultSize and I set it to unlimited, but it does not help since my driver has in total just 4,8GB memory.


EDIT:

DataFrame df1 = table.as("A");
DataFrame df2 = table.withColumnRenamed("id", "key").filter("value = 'foo'");
joinedDF = df1.join(df2.as("B"), col("A.id").
         startsWith(col("B.key")), 
         "right_outer");

I tried broadcast variable too, change is in df2

DataFrame df2 = sc.broadcast(table.withColumnRenamed("id", "key").filter("value = 'foo'")).getValue();
1
What was your spark-submit command? The data shouldn't go through the driver to be written. Are you running in a local mode, standalone, or via YARN/Mesos?Joe Widen
I am running it on standalone cluster in cluster mode, here are submit details /bin/spark-submit --class "dp.test.Main" --master spark://xxx.xxx.xxx.xxx:6066 --deploy-mode cluster --executor-cores 2 --executor-memory 4864m --driver-memory 4864m --driver-cores 2 hdfs://xxx:9000/users/root/myProg.jar I tried also this to have more executors /bin/spark-submit --class "dp.test.Main" --master spark://xxx.xxx.xxx.xxx:6066 --deploy-mode cluster --executor-cores 1 --executor-memory 2432m --driver-memory 4864m --driver-cores 2 hdfs://xxx:9000/users/root/myProg.jarHR.AD
Alright, that should be fine. Are you using a broadcast join by chance in your DataFrame?Joe Widen
question was edited, code of JOIN was addedHR.AD
Take a look at this answer and see if it helps you: stackoverflow.com/questions/30459365/… Also, you can use the Kryo serialization library, that could help. Here is a website that could help: ogirardot.wordpress.com/2015/01/09/…Joe Widen

1 Answers

0
votes

Found the answer in the related post https://stackoverflow.com/a/29602918/5957143

To Summarize @kuujo's answer :

saveAsTextFile does not send the data back to the driver. Rather, it sends the result of the save back to the driver once it's complete. That is, saveAsTextFile is distributed. The only case where it's not distributed is if you only have a single partition or you've coallesced your RDD back to a single partition before calling saveAsTextFile.