0
votes

I am trying to iterate over rows in one PySpark dataframe, and use values in each row to perform operations (filter, select) on a 2nd Pyspark dataframe, then bind all results. Perhaps this is best illustrated:

DF1

id   name   which_col
1    John   col1
2    Jane   col3
3    Bob    col2
4    Barb   col1

DF2

name  col1  col2  col3
Bob   78    43    54
Bob   23    65    34
Bob   12    98    75
John  45    54    54
John  75    43    12
Jane  24    45    21
...

The steps I'd like to perform for every row in DF1 are:

  1. Take value in 'name' and use it to filter DF2 (e.g., for row 1, filter DF2 to only "John" rows.)
  2. Then select the corresponding DF2 column based on the DF1 value in "which_col" (e.g., for John, select col1 in DF2, whereas for Jane it would be col3).
  3. Repeat for each row of DF1
  4. Bind all results together into a final DF.
1

1 Answers

3
votes

You can unpivot (stack) df2 before joining:

result = df1.join(
    df2.selectExpr(
        'name',
        'stack(3, ' + ', '.join(["'%s', %s" % (c, c) for c in df2.columns[1:]]) + ') as (which_col, value)'
    ), 
    ['name', 'which_col'], 
    'left'
)

result.show()
+----+---------+---+-----+
|name|which_col| id|value|
+----+---------+---+-----+
|John|     col1|  1|   75|
|John|     col1|  1|   45|
|Jane|     col3|  2|   21|
| Bob|     col2|  3|   98|
| Bob|     col2|  3|   65|
| Bob|     col2|  3|   43|
|Barb|     col1|  4| null|
+----+---------+---+-----+