Context
I am trying to use Spark/Scala in order to "edit" multiple parquet files (potentially 50k+) efficiently. The only edit that needs to be done is deletion (i.e. deleting records/rows) based on a given set of row IDs.
The parquet files are stored in s3 as a partitioned DataFrame where an example partition looks like this:
s3://mybucket/transformed/year=2021/month=11/day=02/*.snappy.parquet
Each partition can have upwards of 100 parquet files that each are between 50mb and 500mb in size.
Inputs
We are given a spark Dataset[MyClass] called filesToModify which has 2 columns:
s3path: String= the complete s3 path to a parquet file in s3 that needs to be editedids: Set[String]= a set of IDs (rows) that need to be deleted in the parquet file located ats3path
Example input dataset filesToModify:
| s3path | ids |
|---|---|
| s3://mybucket/transformed/year=2021/month=11/day=02/part-1.snappy.parquet | Set("a", "b") |
| s3://mybucket/transformed/year=2021/month=11/day=02/part-2.snappy.parquet | Set("b") |
Expected Behaviour
Given filesToModify I want to take advantage of parallelism in Spark do the following for each row:
- Load the parquet file located at
row.s3path - Filter so that we exclude any row whose
idis in the setrow.ids - Count the number of deleted/excluded rows per id in
row.ids(optional) - Save the filtered data back to the same
row.s3pathto overwrite the file - Return the number of deleted rows (optional)
What I have tried
I have tried using filesToModify.map(row => deleteIDs(row.s3path, row.ids)) where deleteIDs is looks like this:
def deleteIDs(s3path: String, ids: Set[String]): Int = {
import spark.implicits._
val data = spark
.read
.parquet(s3path)
.as[DataModel]
val clean = data
.filter(not(col("id").isInCollection(ids)))
// write to a temp directory and then upload to s3 with same
// prefix as original file to overwrite it
writeToSingleFile(clean, s3path)
1 // dummy output for simplicity (otherwise it should correspond to the number of deleted rows)
}
However this leads to NullPointerException when executed within the map operation. If I execute it alone outside of the map block then it works but I can't understand why it doesn't inside it (something to do with lazy evaluation?).