1
votes

I have two Spark Dataframes.
DataFrame A:

Col_A1     Col_A2  
1      ["x", "y", "z"]  
2      ["a", "x", "y"]  
3      ["a", "b", "c"]

DataFrame B:

Col_B1  
"x"  
"a"  
"y"

I want to check which entries of dataframe A has, say, "x" of Dataframe B in its Col_A2 and it return it as new dataframe itself. Repeatedly I want to do the same for rest of the entries of data frame B.

Output needs to be something like:

DataFrame A_x:

Col_A1     Col_A2  
1      ["x", "y", "z"]  
2      ["a", "x", "y"]

DataFrame A_a:

Col_A1     Col_A2  
2      ["a", "x", "y"]  
3      ["a", "b", "c"]

Dataframe A_y

Col_A1     Col_A2  
1        ["x", "y", "z"]  
2        ["a", "x", "y"]

I tried using udfs and map function, but didn't really get what I'm looking for. Thanks in advance.

1
Is it feasible to collect() dataframe B, or is it so big that it would be prohibitive? - desertnaut
Answer not useful?? - desertnaut

1 Answers

3
votes

If your dataframe B is small and can be collected to a list, plus that the number of its distinct values is small, you could write a simple UDF for each of its elements [UPDATE: see end of post for an easier way]; here is an example for 'x':

spark.version
# u'2.2.0'

from pyspark.sql import Row

df_a = spark.createDataFrame([Row(1, ["x", "y", "z"]),
                              Row(2, ["a", "x", "y"]),
                              Row(3, ["a", "b", "c"])],
                              ["col_A1", "col_A2"])

@udf('boolean')

def x_isin(v):
  if 'x' in v:
    return True
  else:
    return False

temp_x = df_a.withColumn('x_isin', x_isin(df_a.col_A2))
temp_x.show()
# +------+---------+------+
# |col_A1|   col_A2|x_isin|
# +------+---------+------+
# |     1|[x, y, z]|  true|
# |     2|[a, x, y]|  true|
# |     3|[a, b, c]| false|
# +------+---------+------+

df_a_x = temp_x.filter(temp_x.x_isin==True).drop('x_isin')
df_a_x.show()
# +------+---------+ 
# |col_A1|   col_A2|
# +------+---------+
# |     1|[x, y, z]|
# |     2|[a, x, y]|
# +------+---------+

UPDATE (after Marie's comment):

Thanks to Marie for pointing out the array_contains function - now you indeed do not need a UDF to build temp_x:

import pyspark.sql.functions as func

temp_x = df_a.withColumn('x_isin', func.array_contains(df_a.col_A2, 'x'))
temp_x.show() # same result as shown above