Firstly, I join two dataframes, the first DF is filtered from second DF and is about 8MB (260 000 records), second DF is from file that is cca 2GB (37 000 000 records). Then I call
joinedDF.javaRDD().saveAsTextFile("hdfs://xxx:9000/users/root/result");
and I tried also
joinedDF.write().mode(SaveMode.Overwrite).json("hdfs://xxx:9000/users/root/result");
I am bit confused since I get an exception
ERROR TaskSetManager: Total size of serialized results of 54 tasks (1034.6 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
As I know, saveAsTextFile should outputs directly from workers. So why I get exception related to the driver? I know about the option to increase spark.driver.maxResultSize and I set it to unlimited, but it does not help since my driver has in total just 4,8GB memory.
EDIT:
DataFrame df1 = table.as("A");
DataFrame df2 = table.withColumnRenamed("id", "key").filter("value = 'foo'");
joinedDF = df1.join(df2.as("B"), col("A.id").
startsWith(col("B.key")),
"right_outer");
I tried broadcast variable too, change is in df2
DataFrame df2 = sc.broadcast(table.withColumnRenamed("id", "key").filter("value = 'foo'")).getValue();
/bin/spark-submit --class "dp.test.Main" --master spark://xxx.xxx.xxx.xxx:6066 --deploy-mode cluster --executor-cores 2 --executor-memory 4864m --driver-memory 4864m --driver-cores 2 hdfs://xxx:9000/users/root/myProg.jar
I tried also this to have more executors/bin/spark-submit --class "dp.test.Main" --master spark://xxx.xxx.xxx.xxx:6066 --deploy-mode cluster --executor-cores 1 --executor-memory 2432m --driver-memory 4864m --driver-cores 2 hdfs://xxx:9000/users/root/myProg.jar
– HR.AD