0
votes
df = spark.createDataFrame([("1gh","25g","36h"),("2gf","3ku","4we"),("12w","53v","c74"),("1a2","3d4","4c5"),("232","3df","4rt")], ["a","b","c"])


filter_df = spark.createDataFrame([("2gf","3ku"),("12w","53v"), ("12w","53v")], ["a","b"])

I took "a" OF "filter_df" and created an rdd to then to list from the following code

unique_list = filter_df.select("a").rdd.flatMap(lambda x: x).distinct().collect()

This gives me:

unique_list = [u'2gf', u'12w']

Tried converting the rdd into list using collect operation. But this gives me Allocation Errors shown below

final_df = df.filter(F.col("a").isin(unique_list))

118.255: [GC (Allocation Failure) [PSYoungGen: 1380832K->538097K(1772544K)] 2085158K->1573272K(3994112K), 0.0622847 secs] [Times: user=2.31 sys=1.76, real=0.06 secs]
122.540: [GC (Allocation Failure) [PSYoungGen: 1772529K->542497K(2028544K)] 2807704K->1581484K(4250112K), 0.3217980 secs] [Times: user=11.16 sys=13.15, real=0.33 secs]
127.071: [GC (Allocation Failure) [PSYoungGen: 1776929K->542721K(2411008K)] 2815916K->1582011K(4632576K), 0.8024852 secs] [Times: user=58.43 sys=4.85, real=0.80 secs]
133.284: [GC (Allocation Failure) [PSYoungGen: 2106881K->400752K(2446848K)] 3146171K->1583953K(4668416K), 0.4198589 secs] [Times: user=18.31 sys=12.58, real=0.42 secs]
139.050: [GC (Allocation Failure) [PSYoungGen: 1964912K->10304K(2993152K)] 3148113K->1584408K(5214720K), 0.0712454 secs] [Times: user=2.92 sys=0.88, real=0.08 secs]
146.638: [GC (Allocation Failure) [PSYoungGen: 2188864K->12768K(3036160K)] 3762968K->1588544K(5257728K), 0.1212116 secs] [Times: user=3.05 sys=3.74, real=0.12 secs]
154.153: [GC (Allocation Failure) [PSYoungGen: 2191328K->12128K(3691008K)] 3767104K->1590112K(5912576K), 0.1179030 secs] [Times: user=6.94 sys=0.11, real=0.12 secs

required Output:

final_df

+---+---+---+
|  a|  b|  c|
+---+---+---+
|2gf|3ku|4we|
|12w|53v|c74|
+---+---+---+

What is the effective to filter out the spark dataframe using another rdd or a list or a different dataframe. The above mentioned data is sample. I have bigger dataset in real time

2
use left_semi join between the df , filter_df if you don't wanna see any duplicates. Use inner join if you don't mind seeing duplicates. If you still wanna use inner join over semi and filter duplicates then do a .distinct on filter_df before performing the join - jay

2 Answers

0
votes

Use left_semi join.

df.join(filter_df, ['a','b'],'left_semi')
0
votes

You can use inner join:

df.join(filter_df).where(df.a == filter_df.a & df.b == filter_df.b)