I am using spark in local mode and a simple join is taking too long. I have fetched two dataframes: A (8 columns and 2.3 million rows) and B(8 columns and 1.2 million rows) and joining them using A.join(B,condition,'left')
and called an action at last. It creates a single job with three stages, each for two dataframes extraction and one for joining. Surprisingly stage with extraction of dataframe A is taking around 8 minutes and that of dataframe B is taking 1 minute. And join happens within seconds. My important configuration settings are:
- spark.master local[*]
- spark.driver.cores 8
- spark.executor.memory 30g
- spark.driver.memory 30g
- spark.serializer org.apache.spark.serializer.KryoSerializer
- spark.sql.shuffle.partitions 16
The only executor is driver itself. While extracting dataframes, i have partitioned it in 32(also tried 16,64,50,100,200) parts. I have seen shuffle write memory to be 100 MB for Stage with dataframe A extraction. So to avoid shuffle i made 16 initial partitions for both dataframes and broadcasted dataframe B(smaller), but it is not helping. There is still shuffle write memory. I have used broadcast(B)
syntax for this. Am I doing something wrong? Why shuffling is still there? Also when i see event timelines its showing only four cores are processing at any point of time. Although I have a 2core*4 processor machine.Why is that so?