14
votes

We plan to move Apache Pig code to the new Spark platform.

Pig has a "Bag/Tuple/Field" concept and behaves similarly to a relational database. Pig provides support for CROSS/INNER/OUTER joins.

For CROSS JOIN, we can use alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n];

But as we move to the Spark platform I couldn't find any counterpart in the Spark API. Do you have any idea?

2
It's not ready yet but spork(pig on spark) is being built currently, so you may not have to change any of your codeaaronman

2 Answers

23
votes

It is oneRDD.cartesian(anotherRDD).

4
votes

Here is the recommended version for Spark 2.x Datasets and DataFrames:

scala> val ds1 = spark.range(10)
ds1: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> ds1.cache.count
res1: Long = 10

scala> val ds2 = spark.range(10)
ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> ds2.cache.count
res2: Long = 10

scala> val crossDS1DS2 = ds1.crossJoin(ds2)
crossDS1DS2: org.apache.spark.sql.DataFrame = [id: bigint, id: bigint]

scala> crossDS1DS2.count
res3: Long = 100

Alternatively it is possible to use the traditional JOIN syntax with no join condition. Use this configuration option to avoid the error that follows.

spark.conf.set("spark.sql.crossJoin.enabled", true)

Error when that configuration is omitted (using the "join" syntax specifically):

scala> val crossDS1DS2 = ds1.join(ds2)
crossDS1DS2: org.apache.spark.sql.DataFrame = [id: bigint, id: bigint]

scala> crossDS1DS2.count
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
...
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

Related: spark.sql.crossJoin.enabled for Spark 2.x