0
votes

I have written pyspark job to load files present in s3 bucket . In s3 there are too many small files , I am reading file one by one in spark . I am reading a file one one by one as I am adding one column that column has value of bucket path where file is present . Due to this spark job is spending so much of time as it is busy iterating file one by one .

below is code for that :

for filepathins3 in awsfilepathlist:
    data = spark.read.format("parquet").load(filepathins3) \
                    .withColumn("path_s3", lit(filepathins3))

above code is taking so much of time as it is spending much of time reading file one by one , If I provide list of file's path then spark job finishes quickly , but with this approach I can not add column that has filepath as value in the data-frame .

is there way to solve above problem in pyspark job only , rather than creating a separate program to read files and then club and load into spark .

2
I was going to suggest loading awsfilepathlist into a DataFrame and then using mapPartitions but I cannot find this in pyspark. You may be able to create an RDD out of awsfilepathlist (sc.parallelize(awsfilepathlist)) and then use RDD.mapPartitions. When you want the RDD returned as a DataFrame just use the .toDF() method on the RDD.JZimmerman
Now that I think about it, if this runs quickly when you provide a list of paths, perhaps the issue isn't with Spark but rather with how you are loading the paths in the first place. Have you tried to load the paths into a list (for filepathins3 in list(awsfilepathlist):)?JZimmerman
yes issue is with iteration not with spark , passing list of files help but I am adding a column in each file that contains value as file path of s3 , this is reason I have to iterate each file and create column in file and add value that is s3 file path .Jay
if there's something that is unique to each file (in the data), you could load all the files in a single dataframe and then add the paths based on that.KGS
there is data that keeps changing and schema is fixed for all files .Jay

2 Answers

2
votes

If the goal is to get the file path, Spark already has a function input_file_name() :

from pyspark.sql.functions import *

data = spark.read.parquet('s3path').withColumn("input_file", input_file_name()) 
1
votes

You can simply do

spark.read.parquet(*awsfilepathlist)