I'm working to convert a known working SQL query to work in pyspark, given two dataframes, using methods such as: .join
, .where
, filter
, etc.
Here are examples of SQL queries that work (only selecting r.id
where I will normally select more columns):
# "invalid" records, where there is a matching `record_id` for rv_df
SELECT DISTINCT(r.id) FROM core_record AS r LEFT OUTER JOIN core_recordvalidation rv ON r.id = rv.record_id WHERE r.job_id = 41 AND rv.record_id is not null;
# "valid" records, where there is no matching `record_id` for rv_df
SELECT DISTINCT(r.id) FROM core_record AS r LEFT OUTER JOIN core_recordvalidation rv ON r.id = rv.record_id WHERE r.job_id = 41 AND rv.record_id is not null;
I'm 80/20 close, but having trouble wrapping my head around the the last few steps, and/or how to do this most efficiently.
I've got a Dataframe r_df
with column id
that I'd like to join with Dataframe rv_df
on column record_id
. As output, I'd like only distinct r.id
, and only columns from r_df
, none from rv_df
. Finally, I'd like two different calls where there is a match (what will be "invalid" records for me), and where there is not a match (what I consider "valid" records).
I have pyspark queries that get close, but not terribly clear on how to ensure that r_df.id
is distinct, and select only columns from r_df
, none from rv_df
.
Any help would be much appreciated!
invalid_df = r_df.alias('r').join(rv_df.withColumn('record_id', f.col('id')).alias('rv'), on='id', how='left_outer').where('(r.job_id = 41) AND (rv.record_id is not null)').select('r.id').distinct()
. Based on the docs forjoin
: the column(s) must exist on both sides, which is why I created anid
column onrv_df
. – pault