1
votes

I have a Databricks notebook setup that works as the following;

  • pyspark connection details to Blob storage account
  • Read file through spark dataframe
  • convert to pandas Df
  • data modelling on pandas Df
  • convert to spark Df
  • write to blob storage in single file

My problem is, that you can not name the file output file, where I need a static csv filename.

Is there way to rename this in pyspark?

## Blob Storage account information
storage_account_name = ""
storage_account_access_key = ""

## File location and File type
file_location = "path/.blob.core.windows.net/Databricks_Files/input"
file_location_new = "path/.blob.core.windows.net/Databricks_Files/out"
file_type = "csv"

## Connection string to connect to blob storage
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

Followed by outputting file after data transformation

dfspark.coalesce(1).write.format('com.databricks.spark.csv') \
  .mode('overwrite').option("header", "true").save(file_location_new)

Where the file is then write as "part-00000-tid-336943946930983.....csv"

Where as a the goal is to have "Output.csv"

Another approach I looked at was just recreating this in python but have not come across in the documentation yet of how to output the file back to blob storage.

I know the method to retrieve from Blob storage is .get_blob_to_path via microsoft.docs

Any help here is greatly appreciated.

2

2 Answers

2
votes

Hadoop/Spark will parallel output the compute result per partition into one file, so you will see many part-<number>-.... files in a HDFS output path like Output/ named by you.

If you want to output all results of a computing into one file, you can merge them via the command hadoop fs -getmerge /output1/part* /output2/Output.csv or set the number of reduce processes with 1 like using coalesce(1) function.

So in your scenario, you only need to adjust the order of calling these functions to make the coalease function called at the front of save function, as below.

dfspark.write.format('com.databricks.spark.csv') \
  .mode('overwrite').option("header", "true").coalesce(1).save(file_location_new)
3
votes

The coalesce and repartition do not help with saving the dataframe into 1 normally named file.

I ended up just renaming the 1 csv file and deleting the folder with log:

def save_csv(df, location, filename):
  outputPath = os.path.join(location, filename + '_temp.csv')

  df.repartition(1).write.format("com.databricks.spark.csv").mode("overwrite").options(header="true", inferSchema="true").option("delimiter", "\t").save(outputPath)

  csv_files = os.listdir(os.path.join('/dbfs', outputPath))

  # moving the parquet-like temp csv file into normally named one
  for file in csv_files:
    if file[-4:] == '.csv':
      dbutils.fs.mv(os.path.join(outputPath,file) , os.path.join(location, filename))
      dbutils.fs.rm(outputPath, True)

# using save_csv
save_csv_location = 'mnt/.....'
save_csv(df, save_csv_location, 'name.csv')