0
votes

I'm working to convert a known working SQL query to work in pyspark, given two dataframes, using methods such as: .join, .where, filter, etc.

Here are examples of SQL queries that work (only selecting r.id where I will normally select more columns):

# "invalid" records, where there is a matching `record_id` for rv_df
SELECT DISTINCT(r.id) FROM core_record AS r LEFT OUTER JOIN core_recordvalidation rv ON r.id = rv.record_id WHERE r.job_id = 41 AND rv.record_id is not null;

# "valid" records, where there is no matching `record_id` for rv_df
SELECT DISTINCT(r.id) FROM core_record AS r LEFT OUTER JOIN core_recordvalidation rv ON r.id = rv.record_id WHERE r.job_id = 41 AND rv.record_id is not null;

I'm 80/20 close, but having trouble wrapping my head around the the last few steps, and/or how to do this most efficiently.

I've got a Dataframe r_df with column id that I'd like to join with Dataframe rv_df on column record_id. As output, I'd like only distinct r.id, and only columns from r_df, none from rv_df. Finally, I'd like two different calls where there is a match (what will be "invalid" records for me), and where there is not a match (what I consider "valid" records).

I have pyspark queries that get close, but not terribly clear on how to ensure that r_df.id is distinct, and select only columns from r_df, none from rv_df.

Any help would be much appreciated!

1
Both queries in your question look the same to me. Translating it to DataFrame functions would be: invalid_df = r_df.alias('r').join(rv_df.withColumn('record_id', f.col('id')).alias('rv'), on='id', how='left_outer').where('(r.job_id = 41) AND (rv.record_id is not null)').select('r.id').distinct(). Based on the docs for join: the column(s) must exist on both sides, which is why I created an id column on rv_df.pault

1 Answers

0
votes

Just had to walk away for a couple hours. Found a solution that works for my use case.

First, selecting only distinct record_id from rv_df:

rv_df = rv_df.select('record_id').distinct()

Then use that for intersection and disjoints:

# Intersection:
j_df = r_df.join(rv_df, r_df.id == rv_df.record_id, 'leftsemi').select(r_df['*'])

# Disjoint:
j_df = r_df.join(rv_df, r_df.id == rv_df.record_id, 'leftanti').select(r_df['*'])