Using PySpark, I have some code that runs through a bunch of queries.
for index, query in enumerate(query_map):
spark_dataframe.filter(
query).write.csv('s3://OutputBucket/Csvs/Query_{}'.format(index)
I'm new to spark, but I understand that each partition is writing individual csv files to a directory called Query_[index]
. Now I want to collect those files and put them into a pandas dataframe.
import boto3
import pandas
s3 = boto3.resource('s3')
my_bucket = s3.Bucket("OutputBucket")
#Get all csv names
csvs = [
"s3://OutputBucket/Csvs/"+\
str(i.key) for i in my_bucket.objects.filter(Prefix='Query/')]
to_concat = []
#Turn them into a dataframe
for csv in csvs:
try:
to_put_in.append(pandas.read_csv(csv))
except pandas.errors.EmptyDataError:
pass
#Join dataframe
my_big_dataframe = pandas.concat(to_concat)
The problem is that Pyspark writes a lot of empty files out. So my code spends a lot of time trying to read in an empty csv file only to throw an exception.
As I understand, the df_spark.toPandas()
function defeats the purpose of spark since it puts it into driver memory and does not utilize the IO parallelization of each partition. It also defeats the purpose of spark to use coalesce
. So writing to a bunch of csvs and then manually reading them in is not a terrible idea.
tl;dr
My question is if there is a way to skip those empty csv files that pyspark writes by either:
Perhaps boto3 can sort them first by size then just iterate until we fine and empty file?
Is there any way in PySpark to do it without defeating the point of pyspark?