In Hadoop a join/merge of large equi-partitioned data sets could be done without reshuffling and reduce phase simply using map-side join with CompositeInputFormat.
Trying to figure out to do it in Spark:
val x = sc.parallelize(Seq(("D", 1), ("C", 2), ("B", 3), ("A", 4))).toDF("k", "v")
.repartition(col("k")).cache()
val y = sc.parallelize(Seq(("F", 5), ("E", 6), ("D", 7), ("C", 8))).toDF("k", "v")
.repartition(col("k")).cache()
val xy = x.join(y, x.col("k") === y.col("k"), "outer")
x.show() y.show() xy.show()
+---+---+ +---+---+ +----+----+----+----+
| k| v| | k| v| | k| v| k| v|
+---+---+ +---+---+ +----+----+----+----+
| A| 6| | C| 12| | A| 4|null|null|
| B| 5| | D| 11| | B| 3|null|null|
| C| 4| | E| 10| | C| 2| C| 8|
| D| 3| | F| 9| | D| 1| D| 7|
| E| 2| | G| 8| |null|null| E| 6|
| F| 1| | H| 7| |null|null| F| 5|
+---+---+ +---+---+ +----+----+----+----+
So far so good. But when I check execution plan I see "unnecessary" sorts:
xy.explain
== Physical Plan ==
SortMergeOuterJoin [k#1283], [k#1297], FullOuter, None
:- Sort [k#1283 ASC], false, 0
: +- InMemoryColumnarTableScan [k#1283,v#1284], InMemoryRelation [k#1283,v#1284], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1283,200), None, None
+- Sort [k#1297 ASC], false, 0
+- InMemoryColumnarTableScan [k#1297,v#1298], InMemoryRelation [k#1297,v#1298], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1297,200), None, None
Is it possible to avoid sorts here?
Edit
For the reference, Hadoop had this "feature" available since 2007: https://issues.apache.org/jira/browse/HADOOP-2085
Update
As Lezzar pointed out repartition() alone is not sufficient to achieve equi-partitioned sorted state. I think now it needs to be followed by sortWithinPartitions() So that should do the trick:
val x = sc.parallelize(Seq(("F", 1), ("E", 2), ("D", 3), ("C", 4), ("B", 5), ("A", 6))).toDF("k", "v")
.repartition(col("k")).sortWithinPartitions(col("k")).cache()
val y = sc.parallelize(Seq(("H", 7), ("G", 8), ("F", 9), ("E",10), ("D",11), ("C",12))).toDF("k", "v")
.repartition(col("k")).sortWithinPartitions(col("k")).cache()
xy.explain()
== Physical Plan ==
SortMergeOuterJoin [k#1055], [k#1069], FullOuter, None
:- InMemoryColumnarTableScan [k#1055,v#1056], InMemoryRelation [k#1055,v#1056], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1055 ASC], false, 0, None
+- InMemoryColumnarTableScan [k#1069,v#1070], InMemoryRelation [k#1069,v#1070], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1069 ASC], false, 0, None
No sorting anymore!