2
votes

I have two data frames in pyspark, I am checking data in dataframe A and if column is null then replace null data by same column in dataframe B.

Both dataframes have unique ID column, according to that i am joining dataframes and below codes are working fine.

updated_data = TABLE_BY_updated_date_unique.select('name_id_forwarded','name_id','name_id_org','first','last','passport','PHONE','EMAIL')
most_attributes_data = Most_attributes.select('name_id_forwarded','name_id','name_id_org','first','last','passport','PHONE','EMAIL')

final_df = updated_data.alias('a').join(most_attributes_data.alias('b'), on=['name_id_forwarded'], how='left')\
    .select(
        'a.name_id_forwarded','a.name_id','a.name_id_org',
        f.when(f.isnull(f.col('a.first')),f.col('b.first')).otherwise(f.col('a.first')).alias('first'),      
  f.when(f.isnull(f.col('a.last')),f.col('b.last')).otherwise(f.col('a.last')).alias('last'),
  f.when(f.isnull(f.col('a.passport')),f.col('b.passport')).otherwise(f.col('a.passport')).alias('passport'),
  f.when(f.isnull(f.col('a.PHONE')),f.col('b.PHONE')).otherwise(f.col('a.PHONE')).alias('PHONE'),
  f.when(f.isnull(f.col('a.EMAIL')),f.col('b.EMAIL')).otherwise(f.col('a.EMAIL')).alias('EMAIL')
  )

I have more than 40 columns and i don't want to repeat below codes for each column. f.when(f.isnull(f.col('a.EMAIL')),f.col('b.EMAIL')).otherwise(f.col('a.EMAIL')).alias('EMAIL')

  • Can you please help me to loop this syntax so that i can read all the columns without repeating*
2

2 Answers

1
votes

Use coalesce function for this case to dynamically generate expression then use it with .select.

Example:

from pyspark.sql.types import *
from pyspark.sql.functions import *

df=spark.createDataFrame([(1,'a'),(2,None),(3,10000)],['id','name','salary'])
df.show()
#+---+----+------+
#| id|name|salary|
#+---+----+------+
#|  1|   a|    10|
#|  2|null|   100|
#|  3|   b| 10000|
#+---+----+------+
df1=spark.createDataFrame([(1,'a',20),(2,'b',None),(3,None,100)],['id','name','salary'])

df1.show()
#+---+----+------+
#| id|name|salary|
#+---+----+------+
#|  1|   a|    20|
#|  2|   b|  null|
#|  3|null|   100|
#+---+----+------+

df.alias("df").join(df1.alias("df1"),['id'],'left').select('id',*expr).show()

expr=[i for i in df.columns if i=='id'] + [coalesce(f'df1.{i}',f'df.{i}').alias(f'{i}') for i in df.columns if i !='id']

#['id', Column<b'coalesce(df1.name, df.name) AS `name`'>, Column<b'coalesce(df1.salary, df.salary) AS `salary`'>]

df.alias("df").\
join(df1.alias("df1"),['id'],'left').\
select(*expr).\
show()
#+---+----+------+
#| id|name|salary|
#+---+----+------+
#|  1|   a|    20|
#|  3|   b|   100|
#|  2|   b|   100|
#+---+----+------+

UPDATE:

We are using Coalesce function to replace first non null value.

In this case you have b dataframe value to replace if value is null otherwise a value if not null.

In coalesce we need to mention as coalesce(b.first,a.first)

  • if b.first value is null then a.first value will be used.
  • If not b.first value will be used.

Using list comprehension [coalesce(f'df1.{i}',f'df.{i}').alias(f'{i}') for i in df.columns if i !='id'] dynamically creating coalesce expression having df1(b),df(a) dataframes excluding id column as we are joining on this column.

Then adding id column to the list [i for i in df.columns if i=='id']

We have created expression now using .select we are executing the expression prepared in the above step after join .select(*expr).

0
votes

Define list

'''

col_list_1 = ['a.name_id','a.SUM','a.full_name','a.updated']

col_list_2 = ['first_name', 'last_name', 'email', 'phone_number']


colExpr = col_list_1 + list(map(lambda x: "nvl(a.{},b.{}) as {}".format(x,x,x),col_list_2))
      

Unique_With_AllCols = TABLE_BY_updated_date_unique.alias('a').\
                   join(Most_attributes.alias('b'), on=['name_id_forwarded'], 
                   how='left').selectExpr(*colExpr)

'''