0
votes

I have 2 Pyspark Dataframe df1,df2. Both df1 and df2 contains millions of records.

df1 is like:

+-------------------+--------+--------+
|               name|state   | pincode|
+-------------------+--------+--------+
|  CYBEX INTERNATION| HOUSTON| 00530  |
|        FLUID POWER| MEDWAY | 02053  |
|   REFINERY SYSTEMS| FRANCE | 072234 |
|    K N ENTERPRISES| MUMBAI | 100010 |
+-------------------+--------+--------+

df2 is like:

+--------------------+--------+--------+
|               name |state   | pincode|
+--------------------+--------+--------+
|FLUID POWER PVT LTD | MEDWAY | 02053  |
|  CYBEX INTERNATION | HOUSTON| 02356  |
|REFINERY SYSTEMS LTD| MUMBAI | 072234 |
+--------------------+--------+--------+

So, I want to check whether df1 is found on df2 or not based on name state and Pincode and output should if validated i.e row found it will be 1 else 0 and df will be be

+-------------------+--------+--------+--------- --+
|               name|state   | pincode|  Validated |
+-------------------+--------+--------+---------- -+
|  CYBEX INTERNATION| HOUSTON| 00530  |     0      |
|        FLUID POWER| MEDWAY | 02053  |     1      |
|   REFINERY SYSTEMS| FRANCE | 072234 |     0      |
|    K N ENTERPRISES| MUMBAI | 100010 |     0      |
+-------------------+--------+--------+------------+

In 1st Case of row 1 of df1 Pincode doesn't match with any df2 Pincode column so validated = 0
In 2nd case of row 2 of df1 Pincode matched, the state is also matched and for name column, I am using Levenshtein to match column name and final row is validated = 1
In 3rd Row Pincode match but the state does not match and validated = 0
In 4rth Pincode is not there and validated = 0

I tried it with the Pandas dataFrame while iterating the data in nested if, but data is so huge and iteration is not a good choice.

I am expecting to speed up the process using pyspark and leveraging parallel processing something like:

df_final = df1.withColumn('validated', if some_expression == True THEN 1,ELSE 0)

But not able to figure out some_expression, and how to check for entire df1 validate on another df2 with given columns and without any iteration.

I have gone through different spark questions and similar problems but none of them helped me. Any help will be appreciated. Please comment if any information is not clear.

1
You could have used a left join if the you had the exact name. But I think you want to match names based on levenshtein distanceSreeram TP
Definitely @SreeramTP because name in column name is not fixed in length in both dataframes and its arrangement is also not certain that's why some fuzzy logic or Levenshtein distance method has to be applied I guess.MD Rijwan
You can use left join with levenshtein in conditionblackbishop
Hi @blackbishop as you said I have tried df1.join(df2,[df1.pincode==df2.pincode,df1.state == df2.state,levenshtein(df1.name,df2.name)<10],how='left') But I am getting lot of multiple rows and merged data of df2 is not required and I am new to pyspark so how new column will get created based on that is still an issue.MD Rijwan

1 Answers

1
votes

Using levenshtein-distance with left join you can do something like this:

join_condition = (col("df1.pincode") == col("df2.pincode")) \
                 & (levenshtein(col("df1.name"), col("df2.name")) <= 10) \
                 & (col("df1.state") == col("df2.state"))

result_df = df1.alias("df1").join(df2.alias("df2"), join_condition , "left")

result_df.select("df1.*",
              when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated")
              ).show()

#+-----------------+-------+-------+---------+
#|             name|  state|pincode|validated|
#+-----------------+-------+-------+---------+
#|CYBEX INTERNATION|HOUSTON|  00530|        0|
#|      FLUID POWER| MEDWAY|  02053|        1|
#| REFINERY SYSTEMS| FRANCE| 072234|        0|
#|  K N ENTERPRISES| MUMBAI| 100010|        0|
#+-----------------+-------+-------+---------+