I have zip files stored in Amazon s3 then I have a Python list as ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"], I need to unzip all these files using a Spark Cluster, and stored all the CSV files into a delta format table. I would like to know a faster processing approach than my current approach:
1) I have a bucle for for iterating in my Python list.
2) I'm obtaining the zip files from s3 using Python Boto3 s3.bucket.Object(file)
3) I'm unzipping the files using the next code
import io
import boto3
import shutil
import zipfile
for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
obj = s3.bucket.Object(file)
with io.BytesIO(obj.get()["Body"].read()) as tf:
tf.seek(0)
with zipfile.ZipFile(tf, mode='r') as zipf:
for subfile in zipf.namelist():
zipf.extract(subfile, outputZip)
dbutils.fs.cp("file:///databricks/driver/{0}".format(outputZip), "dbfs:" + outputZip, True)
shutil.rmtree(outputZip)
dbutils.fs.rm("dbfs:" + outputZip, True)
4) My files are unzipped in the Driver Node, then the executors can't reach these files (I don't find a way to do it) so I move all these csv files to DBFS using dbutils.fs.cp()
5) I read all the csv files from DBFS using a Pyspark Dataframe and I write that into a Delta table
df = self.spark.read.option("header", "true").csv("dbfs:" + file)
df.write.format("delta").save(path)
6) I delete the data from DBFS and the Driver Node
So, my current goal is to ingest zip files from S3 into a Delta table in less time than my previous process. I suppose that I can parallelize some of these processes as the 1) step, I would like to avoid the copy step to DBFS because I don't need to have the data there, also I need to remove the CSV files after each ingests into a Delta Table to avoid a memory error in the Driver Node disk. Any advice?