0
votes

The below code failing to capture the 'null' value records. From below df1, the column NO . 5 has a null value (name field).

As per my below requirement OutputDF, the No. 5 record should come as mentioned. But after below code execution this record is not coming into the final output. The records with 'null' values are not coming into the output. Except this, remaining everything fine.

df1

NO  DEPT NAME   SAL 
1   IT  RAM     1000    
2   IT  SRI     600 
3   HR  GOPI    1500    
5   HW          700

df2

NO  DEPT NAME   SAL 
1   IT   RAM    1000    
2   IT   SRI    900 
4   MT   SUMP   1200    
5   HW   MAHI   700

OutputDF

NO  DEPT NAME    SAL   FLAG
1   IT  RAM     1000   SAME
2   IT  SRI     900    UPDATE
4   MT  SUMP    1200   INSERT
3   HR  GOPI    1500   DELETE
5   HW  MAHI    700    UPDATE

from pyspark.shell import spark
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
sc = spark.sparkContext

filedf1 = spark.read.option("header","true").option("delimiter", ",").csv("C:\\files\\file1.csv")
filedf2 = spark.read.option("header","true").option("delimiter", ",").csv("C:\\files\\file2.csv")
filedf1.createOrReplaceTempView("table1")
filedf2.createOrReplaceTempView("table2")
df1 = spark.sql( "select * from table1" )
df2 = spark.sql( "select * from table2" )

#DELETE
df_d = df1.join(df2, df1.NO == df2.NO, 'left').filter(F.isnull(df2.NO)).select(df1.NO,df1.DEPT,df1.NAME,df1.SAL, F.lit('DELETE').alias('FLAG'))
print("df_d left:",df_d.show())
#INSERT
df_i = df1.join(df2, df1.NO == df2.NO, 'right').filter(F.isnull(df1.NO)).select(df2.NO,df2.DEPT,df2.NAME,df2.SAL, F.lit('INSERT').alias('FLAG'))
print("df_i right:",df_i.show())
#SAME
df_s = df1.join(df2, df1.NO == df2.NO, 'inner').filter(F.concat(df2.NO,df2.DEPT,df2.NAME,df2.SAL) == F.concat(df1.NO,df1.DEPT,df1.NAME,df1.SAL)).select(df1.NO,df1.DEPT,df1.NAME,df1.SAL, F.lit('SAME').alias('FLAG'))
print("df_s inner:",df_s.show())
#UPDATE
df_u = df1.join(df2, df1.NO == df2.NO, 'inner').filter(F.concat(df2.NO,df2.DEPT,df2.NAME,df2.SAL) != F.concat(df1.NO,df1.DEPT,df1.NAME,df1.SAL)).select(df2.NO,df2.DEPT,df2.NAME,df2.SAL, F.lit('UPDATE').alias('FLAG'))
print("df_u inner:",df_u.show())

df = df_d.union(df_i).union(df_s).union(df_u)
df.show()

Here i'm comparing both df1 and df2, if found new records in df2 taking flag as INSERT, if record is same in both dfs then taking as SAME, if the record is in df1 and not in df2 taking as DELETE and if the record exist in both dfs but with different values then taking df2 values as UPDATE.

1
Are you sure it is Null and not space (empty character)?Ala Tarighati
Sorry for the confusion, it should not accept only null. In sql we'll use something like nvl(name,''), even we can use similar to this also here just to escape nulls. Because if we take big input file (csv), we dont know where null values are there, these kind of things i need to prevent here. Accordingly i need to implement this. This validation only pending, remaining above code working fine.RK.

1 Answers

1
votes

There's two issues with the code:

  1. The result of F.concat of a null returns null, so this part in code filters out row row NO 5:

    .filter(F.concat(df2.NO, df2.NAME, df2.SAL) != F.concat(df1.NO, df1.NAME, df1.SAL))
    
  2. You are only selecting df2. It's fine in the example case above, but if your df2 has a null then the resultant dataframe will have null.

You can try concatenating it with a udf below:

def concat_cols(row):
    concat_row = ''.join([str(col) for col in row if col is not None])
    return concat_row 

udf_concat_cols = udf(concat_cols, StringType())

The function concat_row can be broken down into two parts:

  1. "".join([mylist]) is a string function. It joins everything in the list with the defined delimeter, in this case it's an empty string.
  2. [str(col) for col in row if col is not None] is a list comprehension, it does as it reads: for each column in the row, if the column is not None, then append the str(col) into the list.
    List comprehension is just a more pythonic way of doing this:

    mylist = [] 
    for col in row: 
        if col is not None:
            mylist.append(col))
    

You can replace your update code as:

df_u = (df1
.join(df2, df1.NO == df2.NO, 'inner')
.filter(udf_concat_cols(struct(df1.NO, df1.NAME, df1.SAL)) != udf_concat_cols(struct(df2.NO, df2.NAME, df2.SAL)))
.select(coalesce(df1.NO, df2.NO), 
        coalesce(df1.NAME, df2.NAME),
        coalesce(df1.SAL, df2.SAL),
        F.lit('UPDATE').alias('FLAG')))

You should do something similar for your #SAME flag and break the line for readability.


Update:

If df2 always have the correct (updated) result, there is no need to coalesce. The code for this instance would be:

df_u = (df1
.join(df2, df1.NO == df2.NO, 'inner')
.filter(udf_concat_cols(struct(df1.NO, df1.NAME, df1.SAL)) != udf_concat_cols(struct(df2.NO, df2.NAME, df2.SAL)))
.select(df2.NO,
        df2.NAME,
        df2.SAL,
        F.lit('UPDATE').alias('FLAG')))