1
votes

In PySpark I want to make a full outer join of two RDDs with key-values pairs where keys may be None. For example:

rdd1 = sc.parallelize([(None, "a"), (None, "b")])
rdd2 = sc.parallelize([(None, "c"), (None, "d")])
join_rdd = rdd1.join(rdd2)

It looks like that PySpark joins the records where keys are None:

print(rdd1.join(rdd2).take(10))
>>> [(None, ('a', 'c')), (None, ('a', 'd')), (None, ('b', 'c')), (None, ('b', 'd'))]

However, in SQL when I join two tables:

Table1:    Table2:
key   val   key   val
NULL  a     NULL  c
NULL  b     NULL  d

SELECT * FROM Table1 JOIN Table2 ON Table1.key = Table2.key

I have an empty result set.

I suppose that this is because in Python None == None is true and in SQL NULL = NULL is false.

I have two questions:

  1. Is there a way to emulate SQL behaviour and force PySpark not to join by Nones?

  2. Is is a bug or feature? As SQL user I've expected that joining by null keys returns nothing. I am new in PySpark and found nothing in documentation about joinig Nones. Maybe it's worth to make some note in Spark Programming Guide?

Or am I wrong somewhere?

Thanks!

1

1 Answers

1
votes

Your expectations are wrong. RDD API doesn't follow SQL semantics and was never intended to. RDD.join is simply a hash based linking with a portable_hash which is designed to provide meaningful None hashing in the first place.

If you want SQL-like semantics you should use Spark SQL / Data Frames:

schema = StructType([
  StructField("_1", IntegerType(), True), StructField("_2", StringType(), False)
])

df1 = sqlContext.createDataFrame(rdd1, schema)
df2 = sqlContext.createDataFrame(rdd2, schema)
df1.join(df2, ["_1"])

If you want to achieve similar result on RDDs you filter out None keys before join:

rdd1.filter(lambda x: x[0] is not None).join(rdd2)