1
votes

I would like to implement below requirement using Spark dataframes to compare 2 text/csv

  • List item

files. Ideally, File1.txt should compare with File2.txt and result should be in other txt file with flag as (SAME/UPDATE/INSERT/DELETE).

UPDATE - if any record values are updated in file2 when compared to file1 INSERT - if a new record exist in file2 DELETE - only if the record exist in file1 (not in file2) SAME - if same record exist in both files

File1.txt
NO  DEPT NAME   SAL 
1   IT  RAM     1000    
2   IT  SRI     600 
3   HR  GOPI    1500    
5   HW  MAHI    700 

File2.txt
NO  DEPT NAME   SAL 
1   IT   RAM    1000    
2   IT   SRI    900 
4   MT   SUMP   1200    
5   HW   MAHI   700

Outputfile.txt
NO  DEPT NAME    SAL   FLAG
1   IT  RAM     1000    S
2   IT  SRI     900     U
4   MT  SUMP    1200    I
5   HW  MAHI    700     S
3   HR  GOPI    1500    D

So far, i did below coding. But not able to proceed further. Pls help.

from pyspark.shell import spark
sc = spark.sparkContext
df1 = spark.read.option("header","true").option("delimiter", ",").csv("C:\\inputs\\file1.csv")
df2 = spark.read.option("header","true").option("delimiter", ",").csv("C:\\inputs\\file2.csv")

df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")

sqlDF1 = spark.sql( "select * from table1" )
sqlDF2 = spark.sql( "select * from table2" )

leftJoinDF = sqlDF1.join(sqlDF2, 'id', how='left')
rightJoinDF = sqlDF1.join(sqlDF2, 'id', how='right')
innerJoinDF = sqlDF1.join(sqlDF2, 'id')

Is there any way if we merge the data, after performing leftJoin, rightJoin, innerJoin. With this whether i could get desired output or any other way.

Thanks,

2

2 Answers

1
votes

You can find my solution below. I create 4 dataframe for SAME/UPDATE/INSERT/DELETE cases and then union them

>>> from functools import reduce
>>> from pyspark.sql import DataFrame
>>> import pyspark.sql.functions as F

>>> df1 = sc.parallelize([
...     (1,'IT','RAM',1000),    
...     (2,'IT','SRI',600),
...     (3,'HR','GOPI',1500),    
...     (5,'HW','MAHI',700)
...     ]).toDF(['NO','DEPT','NAME','SAL'])
>>> df1.show()
+---+----+----+----+
| NO|DEPT|NAME| SAL|
+---+----+----+----+
|  1|  IT| RAM|1000|
|  2|  IT| SRI| 600|
|  3|  HR|GOPI|1500|
|  5|  HW|MAHI| 700|
+---+----+----+----+

>>> df2 = sc.parallelize([
...     (1,'IT','RAM',1000),    
...     (2,'IT','SRI',900),
...     (4,'MT','SUMP',1200),    
...     (5,'HW','MAHI',700)
...     ]).toDF(['NO','DEPT','NAME','SAL'])
>>> df2.show()
+---+----+----+----+
| NO|DEPT|NAME| SAL|
+---+----+----+----+
|  1|  IT| RAM|1000|
|  2|  IT| SRI| 900|
|  4|  MT|SUMP|1200|
|  5|  HW|MAHI| 700|
+---+----+----+----+

#DELETE
>>> df_d = df1.join(df2, df1.NO == df2.NO, 'left').filter(F.isnull(df2.NO)).select(df1.NO,df1.DEPT,df1.NAME,df1.SAL, F.lit('D').alias('FLAG'))
#INSERT
>>> df_i = df1.join(df2, df1.NO == df2.NO, 'right').filter(F.isnull(df1.NO)).select(df2.NO,df2.DEPT,df2.NAME,df2.SAL, F.lit('I').alias('FLAG'))
#SAME/
>>> df_s = df1.join(df2, df1.NO == df2.NO, 'inner').filter(F.concat(df2.NO,df2.DEPT,df2.NAME,df2.SAL) == F.concat(df1.NO,df1.DEPT,df1.NAME,df1.SAL)).\
...     select(df1.NO,df1.DEPT,df1.NAME,df1.SAL, F.lit('S').alias('FLAG'))
#UPDATE
>>> df_u = df1.join(df2, df1.NO == df2.NO, 'inner').filter(F.concat(df2.NO,df2.DEPT,df2.NAME,df2.SAL) != F.concat(df1.NO,df1.DEPT,df1.NAME,df1.SAL)).\
...     select(df2.NO,df2.DEPT,df2.NAME,df2.SAL, F.lit('U').alias('FLAG'))


>>> dfs = [df_s,df_u,df_u,df_i]
>>> df = reduce(DataFrame.unionAll, dfs)
>>> 
>>> df.show()
+---+----+----+----+----+                                                       
| NO|DEPT|NAME| SAL|FLAG|
+---+----+----+----+----+
|  5|  HW|MAHI| 700|   S|
|  1|  IT| RAM|1000|   S|
|  2|  IT| SRI| 900|   U|
|  2|  IT| SRI| 900|   U|
|  4|  MT|SUMP|1200|   I|
+---+----+----+----+----+
0
votes

You can use 'outer' join after concatenating all columns first. Then create an udf for flags.

import pyspark.sql.functions as F

df = sql.createDataFrame([
     (1,'IT','RAM',1000),
     (2,'IT','SRI',600),
     (3,'HR','GOPI',1500),
     (5,'HW','MAHI',700)],
     ['NO'  ,'DEPT', 'NAME',   'SAL' ])

df1 = sql.createDataFrame([
     (1,'IT','RAM',1000),
     (2,'IT','SRI',900),
     (4,'MT','SUMP',1200 ),
     (5,'HW','MAHI',700)],
     ['NO'  ,'DEPT', 'NAME',   'SAL' ])

def flags(x,y):
    if not x:
        return y+'-I'
    if not y:
        return x+'-D'
    if x == y:
        return x+'-S'
    return y+'-U'

_cols = df.columns
flag_udf = F.udf(lambda x,y: flags(x,y),StringType())   


df = df.select(['NO']+ [F.concat_ws('-', *[F.col(_c) for _c in df.columns]).alias('f1')])\
        .join(df1.select(['NO']+ [F.concat_ws('-', *[F.col(_c1) for _c1 in df1.columns]).alias('f2')]), 'NO', 'outer')\
        .select(flag_udf('f1','f2').alias('combined'))
df.show()

The result will be,

+----------------+                                                              
|        combined|
+----------------+
| 5-HW-MAHI-700-S|
| 1-IT-RAM-1000-S|
|3-HR-GOPI-1500-D|
|  2-IT-SRI-900-U|
|4-MT-SUMP-1200-I|
+----------------+

Finally, split the combined column.

split_col = F.split(df['combined'], '-')
df = df.select([split_col.getItem(i).alias(s) for i,s in enumerate(_cols+['FLAG'])])

df.show()

You get the desired output,

+---+----+----+----+----+                                                       
| NO|DEPT|NAME| SAL|FLAG|
+---+----+----+----+----+
|  5|  HW|MAHI| 700|   S|
|  1|  IT| RAM|1000|   S|
|  3|  HR|GOPI|1500|   D|
|  2|  IT| SRI| 900|   U|
|  4|  MT|SUMP|1200|   I|
+---+----+----+----+----+