It can be done either using PairRDDFunctions
or Spark Data Frames. Since data frame operations benefit from Catalyst Optimizer the second option is worth considering.
Assuming your data looks as follows:
rdd1 = sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 = sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])
With PairRDDs:
Inner join:
rdd1.join(rdd2)
Left outer join:
rdd1.leftOuterJoin(rdd2)
Cartesian product (doesn't require RDD[(T, U)]
):
rdd1.cartesian(rdd2)
Broadcast join (doesn't require RDD[(T, U)]
):
Finally there is cogroup
which has no direct SQL equivalent but can be useful in some situations:
cogrouped = rdd1.cogroup(rdd2)
cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]
With Spark Data Frames
You can use either SQL DSL or execute raw SQL using sqlContext.sql
.
df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))
# Register temporary tables to be able to use `sparkSession.sql`
df1.createOrReplaceTempView('df1')
df2.createOrReplaceTempView('df2')
Inner join:
# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner')
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')
Left outer join:
df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')
Cross join (explicit cross join or configuration changes are required in Spark. 2.0 - spark.sql.crossJoin.enabled for Spark 2.x):
df1.crossJoin(df2)
spark.sql('SELECT * FROM df1 CROSS JOIN df2')
df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')
Since 1.6 (1.5 in Scala) each of these can be combined with broadcast
function:
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), df1.k == df2.k)
to perform broadcast join. See also Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark