1
votes

Using PySpark, I have some code that runs through a bunch of queries.

for index, query in enumerate(query_map):  
  spark_dataframe.filter(
       query).write.csv('s3://OutputBucket/Csvs/Query_{}'.format(index)

I'm new to spark, but I understand that each partition is writing individual csv files to a directory called Query_[index]. Now I want to collect those files and put them into a pandas dataframe.

import boto3
import pandas
s3 = boto3.resource('s3')
my_bucket = s3.Bucket("OutputBucket")
#Get all csv names
csvs = [
    "s3://OutputBucket/Csvs/"+\
    str(i.key) for i in my_bucket.objects.filter(Prefix='Query/')] 
to_concat = []
#Turn them into a dataframe
for csv in csvs:
    try:
        to_put_in.append(pandas.read_csv(csv))
    except pandas.errors.EmptyDataError:
        pass
#Join dataframe
my_big_dataframe = pandas.concat(to_concat)

The problem is that Pyspark writes a lot of empty files out. So my code spends a lot of time trying to read in an empty csv file only to throw an exception.

As I understand, the df_spark.toPandas() function defeats the purpose of spark since it puts it into driver memory and does not utilize the IO parallelization of each partition. It also defeats the purpose of spark to use coalesce. So writing to a bunch of csvs and then manually reading them in is not a terrible idea.

tl;dr

My question is if there is a way to skip those empty csv files that pyspark writes by either:

  1. Perhaps boto3 can sort them first by size then just iterate until we fine and empty file?

  2. Is there any way in PySpark to do it without defeating the point of pyspark?

1
Ia your end goal to concatenate all of the part files into one csv? You can do that from the command line, if that's an option. See this answer.pault
So just save to hdfs then concatenate from the head node?jwillis0720
That's usually the way I do it, but I don't know what your use case is.pault

1 Answers

0
votes

I faced a similar problem few months ago. Used something like this

# get the number of non-empty partitions in dataframe df
numNonEmptyPartitions = (df.rdd.glom().map(lambda x: 1 if len(x)>0 else 0).
                                reduce(lambda x,y: x+y))

df = df.coalesce(numNonEmptyPartitions)

Now, you will have all non empty partitions.