1
votes

I have a very large dataframe in pyspark. It has over 10 million rows and over 30 columns.

What is the best and efficient method to search the entire dataframe for a given list of values and remove the row which contains that value?

The given list of values: 

list=['1097192','10727550','1098754']

The dataframe(df) is :
 +---------+--------------+---------------+---------+------------+
 |   id    |  first_name  |   last_name   | Salary  | Verifycode |
 +---------+--------------+---------------+---------+------------+
 |    1986 | Rollie       | Lewin         | 1097192 |   42254172 | -Remove Row
 |  289743 | Karil        | Sudron        | 2785190 |    3703538 |
 |    3864 | Massimiliano | Dallicott     | 1194553 |   23292573 |
 |   49074 | Gerry        | Grinnov       | 1506584 |   62291161 |
 | 5087654 | Nat          | Leatherborrow | 1781870 |   55183252 |
 |     689 | Thaine       | Tipple        | 2150105 |   40583249 |
 |    7907 | Myrlene      | Croley        | 2883250 |   70380540 |
 |     887 | Nada         | Redier        | 2676139 |   10727550 | -Remove Row
 |   96533 | Sonny        | Bosden        | 1050067 |   13110714 |
 | 1098754 | Dennie       | McGahy        | 1804487 |     927935 | -Remove Row
 +---------+--------------+---------------+---------+------------+

If it was a smaller dataframe I could use collect() or toLocalIterator() functions and then iterate over the rows and remove it based on list values.

Since it is a very large dataframe what is the best way to solve this?

I have come up with this solution now but is there a better way:

column_names = df.schema.names
for name in column_names:
    df = df.filter(~col(name).isin(list))
1
Do you need to retain all columns on your dataframe before doing the filter? In which format is this data stored? Is it partitioned? Where are you executing the code (in local machine, cluster, etc)? - Mitodina
I need to retain all the columns as the value to be searched can be in any column of the dataframe. The code is executed on a cluster and the final output format is csv. - navin

1 Answers

0
votes

You got the correct approach of filtering the Dataframe using filter and isin function. You can use isin function if the list is small (in few thousands not millions). Also make sure that your dataframe is partitioned to at least 3*number of CPUs on the executors. It is a must to have lot of partitions without that parallelism will suffer.

I am comfortable with Scala so please take the concept from the below code. You need to build a Column object by combining all columns to be filtered on. Then provide that final column object on the dataframe.filter

column_names = df.schema.names
colFinal // initialize with 1 column name as col("colName").isin(list)
for name in column_names:
    colFinal = colFinal.or(col(name).isin(list))

df = df.filter(!colFinal) // apply negation of final column object