0
votes

spark-sql I am using Spark-sql 2.4.

I have a question which is bugging me for quite some time now - Whether to use DISTINCT OR GROUP BY (without any aggregations) to remove duplicates from a table efficiently with better query performance.

With DISTINCT, I would use the following -

select distinct 
       id, 
       fname, 
       lname, 
       age
from emp_table;

For GROUP BY, I will just use :

select id,
       fname,
       lname,
       age
from emp_table
group by 1, 2, 3, 4;

I had read somewhere about Spark-SQL that Distinct should be used only if the cardinality of the dataset is high, otherwise use Group By. However, in my day to day work I have seen that Duplicate perform better than Group By even in scenarios where cardinality is low.

So my question is which one would perform better in what scenarios.

Can some one please enlighten me on this matter. In which conditions would a query with Distinct perform better vs which scenarios with Group By.

Thanks

2
try an .explain and tell us your conclusions. - thebluephantom

2 Answers

1
votes

They are functionally equivalent and will generate same query plan. Use distinct for clarity.

0
votes

Here are the query plans for the two queries. As @thebluephantom said they are identical, so there should be no performance difference whatsoever.

create table t1 (a int, b int, c int, d int);

explain select a,b,c,d from t1 group by 1,2,3,4;
== Physical Plan ==
*(2) HashAggregate(keys=[a#14, b#15, c#16, d#17], functions=[])
+- Exchange hashpartitioning(a#14, b#15, c#16, d#17, 200), true, [id=#33]
   +- *(1) HashAggregate(keys=[a#14, b#15, c#16, d#17], functions=[])
      +- Scan hive default.t1 [a#14, b#15, c#16, d#17], HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#14, b#15, c#16, d#17], Statistics(sizeInBytes=8.0 EiB)
explain select distinct a,b,c,d from t1;
== Physical Plan ==
*(2) HashAggregate(keys=[a#23, b#24, c#25, d#26], functions=[])
+- Exchange hashpartitioning(a#23, b#24, c#25, d#26, 200), true, [id=#58]
   +- *(1) HashAggregate(keys=[a#23, b#24, c#25, d#26], functions=[])
      +- Scan hive default.t1 [a#23, b#24, c#25, d#26], HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#23, b#24, c#25, d#26], Statistics(sizeInBytes=8.0 EiB)

The extended explanation shows that the query became identical after they have been optimized:

explain extended select a,b,c,d from t1 group by 1,2,3,4;
== Parsed Logical Plan ==
'Aggregate [1, 2, 3, 4], ['a, 'b, 'c, 'd]
+- 'UnresolvedRelation [t1]

== Analyzed Logical Plan ==
a: int, b: int, c: int, d: int
Aggregate [a#41, b#42, c#43, d#44], [a#41, b#42, c#43, d#44]
+- SubqueryAlias spark_catalog.default.t1
   +- HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#41, b#42, c#43, d#44], Statistics(sizeInBytes=8.0 EiB)

== Optimized Logical Plan ==
Aggregate [a#41, b#42, c#43, d#44], [a#41, b#42, c#43, d#44]
+- HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#41, b#42, c#43, d#44], Statistics(sizeInBytes=8.0 EiB)

== Physical Plan ==
*(2) HashAggregate(keys=[a#41, b#42, c#43, d#44], functions=[], output=[a#41, b#42, c#43, d#44])
+- Exchange hashpartitioning(a#41, b#42, c#43, d#44, 200), true, [id=#108]
   +- *(1) HashAggregate(keys=[a#41, b#42, c#43, d#44], functions=[], output=[a#41, b#42, c#43, d#44])
      +- Scan hive default.t1 [a#41, b#42, c#43, d#44], HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#41, b#42, c#43, d#44], Statistics(sizeInBytes=8.0 EiB)
explain extended select distinct a,b,c,d from t1;
== Parsed Logical Plan ==
'Distinct
+- 'Project ['a, 'b, 'c, 'd]
   +- 'UnresolvedRelation [t1]

== Analyzed Logical Plan ==
a: int, b: int, c: int, d: int
Distinct
+- Project [a#50, b#51, c#52, d#53]
   +- SubqueryAlias spark_catalog.default.t1
      +- HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#50, b#51, c#52, d#53], Statistics(sizeInBytes=8.0 EiB)

== Optimized Logical Plan ==
Aggregate [a#50, b#51, c#52, d#53], [a#50, b#51, c#52, d#53]
+- HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#50, b#51, c#52, d#53], Statistics(sizeInBytes=8.0 EiB)

== Physical Plan ==
*(2) HashAggregate(keys=[a#50, b#51, c#52, d#53], functions=[], output=[a#50, b#51, c#52, d#53])
+- Exchange hashpartitioning(a#50, b#51, c#52, d#53, 200), true, [id=#133]
   +- *(1) HashAggregate(keys=[a#50, b#51, c#52, d#53], functions=[], output=[a#50, b#51, c#52, d#53])
      +- Scan hive default.t1 [a#50, b#51, c#52, d#53], HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#50, b#51, c#52, d#53], Statistics(sizeInBytes=8.0 EiB)

And in fact suggests that the query engine seems to prefer the group by query.