I have two big Hive tables which I want to join with spark.sql. Let's say we have table 1 and table 2 with 5 million rows in table 1 and 70 million rows on table 2. Tables are in snappy format and stored as parquet files in Hive.
I want to join them and take some aggregations on some columns lets say count all rows and the average of a column (e.g. doubleColumn) while filtering with two condition (lets say on col1,col2).
Note: I work in our test installation on a single machine (which is quite powerful though). I expect that performance would probably be different in a cluster.
My first try is to use spark sql like:
val stat = sqlContext.sql("select count(id), avg(doubleColumn) " +
" FROM db.table1 as t1 JOIN db.table2 " +
" ON t1.id = t2.id " +
" WHERE col1 = val1 AND col2 = val2").collect
Unfortunately this run very poorly about 5 minutes even when I give at least 8 gb memory per executor and driver. I also tried to use dataframe syntax and try to filter the rows first and select only specific columns to have better selectivity like:
//Filter first and select only needed column
val df = spark.sql("SELECT * FROM db.tab1")
val tab1= df.filter($"col1" === "val1" && $"col2" === "val2").select("id")
val tab2= spark.sql("SELECT id, doubleColumn FROM db.tab2")
val joined = tab1.as("d1").join(tab2.as("d2"), $"d1.id" === $"d2.id")
//Take the aggregations on the joined df
import org.apache.spark.sql.functions;
joined.agg(
functions.count("id").as("count"),
functions.avg("doubleColumn").as("average")
).show();
But this has no significant performance gain. How can I improve the performance in join?
Which is the best way to do this spark.sql or dataframe syntax?
Giving more executors or memory will help?
Should I use cache?
I cached both dataframes tab1,tab2 and join aggregation had significant gain but I don't think is practical to cache my dataframes as we are interested in concurrency many users simultaneously asking the some analytical query.Is there nothing to do because I work on single node and my problems would go away when I go to production environment on a cluster?
Bonus question: I tried this query with Impala and it did about 40 seconds but it is was way better than spark.sql. How can Impala be better than spark?!