1
votes

My goal is to merge two dataframes on the column id, and perform a somewhat complex merge on another column that contains JSON we can call data.

Suppose I have the DataFrame df1 that looks like this:

id | data
---------------------------------
42 | {'a_list':['foo'],'count':1}
43 | {'a_list':['scrog'],'count':0}

And I'm interested in merging with a similar, but different DataFrame df2:

id | data
---------------------------------
42 | {'a_list':['bar'],'count':2}
44 | {'a_list':['baz'],'count':4}

And I would like the following DataFrame, joining and merging properties from the JSON data where id matches, but retaining rows where id does not match and keeping the data column as-is:

id | data
---------------------------------------
42 | {'a_list':['foo','bar'],'count':3}  <-- where 'bar' is added to 'foo', and count is summed
43 | {'a_list':['scrog'],'count':1}
44 | {'a_list':['baz'],'count':4}

As can be seen where id is 42, there is a some logic I will have to apply to how the JSON is merged.

My knee jerk thought is that I'd like to provide a lambda / udf to merge the data column, but not sure how to think about that with during a join.

Alternatively, I could break the properties from the JSON into columns, something like this, that might be a better approach?

df1:

id | a_list    | count
----------------------
42 | ['foo']   | 1
43 | ['scrog'] | 0

df2:

id | a_list   | count
---------------------
42 | ['bar']  | 2
44 | ['baz']  | 4

Resulting:

id | a_list         | count
---------------------------
42 | ['foo', 'bar'] | 3
43 | ['scrog']      | 0
44 | ['baz']        | 4

If I went this route, I would then have to merge the columns a_list and count into JSON again under a single column data, but this I can wrap my head around as a relatively simple map function.

Update: Expanding on Question

More realistically, I will have n number of DataFrames in a list, e.g. df_list = [df1, df2, df3], all shaped the same. What is an efficient way to perform these same actions on n number of DataFrames?

Update to Update

Not sure how efficient this is, or if there is a more spark-esque way to do this, but incorporating accepted answer, this appears to work for question update:

for i in range(0, (len(validations) - 1)):  

    # set dfs
    df1 = validations[i]['df']
    df2 = validations[(i+1)]['df']

    # joins here...

    # update new_df
    new_df = df2
1
Consider doing an outer join and the using a pyspark udf (changhsinlee.com/pyspark-udf) to capture the logic you would like to implment - Tom Ron

1 Answers

2
votes

Here's one way to accomplish your second approach:

Explode the list column and then unionAll the two DataFrames. Next groupBy the "id" column and use pyspark.sql.functions.collect_list() and pyspark.sql.functions.sum():

import pyspark.sql.functions as f
new_df = df1.select("id", f.explode("a_list").alias("a_values"), "count")\
    .unionAll(df2.select("id", f.explode("a_list").alias("a_values"), "count"))\
    .groupBy("id")\
    .agg(f.collect_list("a_values").alias("a_list"), f.sum("count").alias("count"))

new_df.show(truncate=False)
#+---+----------+-----+
#|id |a_list    |count|
#+---+----------+-----+
#|43 |[scrog]   |0    |
#|44 |[baz]     |4    |
#|42 |[foo, bar]|3    |
#+---+----------+-----+

Finally you can use pyspark.sql.functions.struct() and pyspark.sql.functions.to_json() to convert this intermediate DataFrame into your desired structure:

new_df = new_df.select("id", f.to_json(f.struct("a_list", "count")).alias("data"))
new_df.show()
#+---+----------------------------------+
#|id |data                              |
#+---+----------------------------------+
#|43 |{"a_list":["scrog"],"count":0}    |
#|44 |{"a_list":["baz"],"count":4}      |
#|42 |{"a_list":["foo","bar"],"count":3}|
#+---+----------------------------------+

Update

If you had a list of dataframes in df_list, you could do the following:

from functools import reduce   # for python3
df_list = [df1, df2]
new_df = reduce(lambda a, b: a.unionAll(b), df_list)\
    .select("id", f.explode("a_list").alias("a_values"), "count")\
    .groupBy("id")\
    .agg(f.collect_list("a_values").alias("a_list"), f.sum("count").alias("count"))\
    .select("id", f.to_json(f.struct("a_list", "count")).alias("data"))