1
votes

Say I have two dataframes with 4 columns each. The first 3 columns are string types, and the 4th column is an array type. I would like to concatenate these two dataframes so that the resulting dataframe will fulfill the following:

In rows where the first 3 columns' values are identical between the two dataframes, the row in the result dataframe will contain the identical values, and the array column will contain a union of all the values in each of the original dataframes' 4th column arrays.

Rows that don't have an 'identical' (just the first 3 columns) partner in the second dataframe, will appear as they are originally in the result dataframe.

Example:

DF1 = [
Row(str1="StringA", str2="StringB", str3="StringC", arr=["array_member_a"]),
Row(str1="String1", str2="String2", str3="String3", arr=["array_member_1"])]

DF2 = [ 
Row(str1="StringA", str2="StringB", str3="StringC", arr=["array_member_d"]),
Row(str1="String1", str2="String8", str3="String9", arr=["array_member_x"])]

reulst_DF = [
Row(str1="StringA", str2="StringB", str3="StringC", arr=["array_member_a", "array_member_d"]),
Row(str1="String1", str2="String2", str3="String3", arr=["array_member_1"]),
Row(str1="String1", str2="String8", str3="String9", arr=["array_member_x"])]
1

1 Answers

1
votes

Question: What's the difference between a join and a group-by? Answer: Only the axis of the aggregation.

It's much simpler to aggregate distinct rows than it is to aggregate distinct columns, so let's reinterpret your problem.

First we "join" the dataframes on the row axis with a union. As opposed to a join, where the rows we want to aggregate will be in different columns on one row, this will produce multiple rows with a single column to aggregate:

on = ['_1', '_2', '_3']

(df1
 .union(df2))

[Row(_1='StringA', _2='StringB', _3='StringC', _4=['array_member_a']),
 Row(_1='String1', _2='String2', _3='String3', _4=['array_member_1']),
 Row(_1='StringA', _2='StringB', _3='StringC', _4=['array_member_d']),
 Row(_1='String1', _2='String8', _3='String9', _4=['array_member_x'])]

Now we collect the values of the rows. These are arrays, so they must be flattened before being made distinct:

from pyspark.sql.functions import array_distinct, collect_set, flatten

(df1
 .union(df2)
 .groupby(on).agg(array_distinct(flatten(collect_set('_4'))).alias('_4')))

[Row(_1='String1', _2='String2', _3='String3', _4=['array_member_1']),
 Row(_1='StringA', _2='StringB', _3='StringC', _4=['array_member_a', 'array_member_d'])]