6
votes

I am using the DataFrame API of pyspark (Apache Spark) and am running into the following problem:

When I join two DataFrames that originate from the same source DataFrame, the resulting DF will explode to a huge number of rows. A quick example:

I load a DataFrame with n rows from disk:

df = sql_context.parquetFile('data.parquet')

Then I create two DataFrames from that source.

df_one = df.select('col1', 'col2')
df_two = df.select('col1', 'col3')

Finally I want to (inner) join them back together:

df_joined = df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner')

The key in col1 is unique. The resulting DataFrame should have n rows, however it does have n*n rows.

That does not happen, when I load df_one and df_two from disk directly. I am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot.

Can anyone explain why that happens?

2
Sorry are you wanting df_one.merge(df_two, left_on='col1', right_on='col2', how='inner')? - EdChum
@EdChum Sorry, I forgot to mention that I am using Apache Spark and edited the question to reflect that. There is no such thing as merge on a Spark DataFrame I'm afraid. - karlson
OK thought this might be a pandas question - EdChum
Could you elaborate a bit on your use case, since splitting and then joining back together sounds a bit sub-optimal? Why the split? - Marko Bonaci
@MarkoBonaci This is a contrived example. In reality one might want to do something complex with part of the data, do something else with another part of the data and then maybe implement a filter by inner joining the two results. There are always other means to achieve the same thing of course, but still the above should work, right? - karlson

2 Answers

4
votes

If I'm reading this correctly, df_two doesn't have a col2

    df_one = df.select('col1', 'col2')
    df_two = df.select('col1', 'col3')

So when you do:

    df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner')

That should fail. If you meant to say

    df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner')

However, the fact that you're loading from the same data frame should have no impact. I would suggest that you do:

    df_one.show()
    df_two.show()

To ensure that the data you've selected is what you expected.

1
votes

I'm seeing this problem in my large dataset too, on Spark 1.3. Unfortunately, in the small, contrived examples I made up 'join' works correctly. I feel like there's some underlying bug from the steps preceeding the join perhaps

Performing the join (Note: DateTime is just a string):

> join = df1.join(df2, df1.DateTime == df2.DateTime, "inner")
> join.count()

250000L

This is obviously returning the full 500*500 cartesian join.

What does work for me is switching to SQL:

  > sqlc.registerDataFrameAsTable(df1, "df1")
  > sqlc.registerDataFrameAsTable(df2, "df2")
  > join = sqlc.sql("select * from df1, df2 where df1.DateTime = df2.DateTime")
  > join.count()
  471L

That value looks right.

Seeing this, I personally will not be using pyspark's DataFrame.join() until I can understand this difference better.