0
votes

I have an Azure Data Lake gen1 and an Azure Data Lake gen2 (Blob Storage w/hierarchical) and I am trying to create a Databricks notebook (Scala) that reads 2 files and writes a new file back into the Data Lake. In both Gen1 and Gen2 I am experiencing the same issue where the file name of the output csv I have specified is getting saved as a directory and inside that directory it's writing 4 files "committed, started, _SUCCESS, and part-00000-tid-

databricks output screenshot

For the life of me, I can't figure out why it's doing it and not actually saving the csv to the location. Here's an example of the code I've written. If I do a .show() on the df_join dataframe then it outputs the correct looking results. But the .write is not working correctly.

val df_names = spark.read.option("header", "true").csv("/mnt/datalake/raw/names.csv")
val df_addresses = spark.read.option("header", "true").csv("/mnt/datalake/raw/addresses.csv")

val df_join = df_names.join(df_addresses, df_names.col("pk") === df_addresses.col("namepk"))


df_join.write
.format("com.databricks.spark.csv")
.option("header", "true")
.mode("overwrite")
.save("/mnt/datalake/reports/testoutput.csv")
3

3 Answers

1
votes

If I understand for your needs correctly, you just want to write the Spark DataFrame data to a single csv file named testoutput.csv into Azure Data Lake, not a directory named testoutput.csv with some partition files.

So you can not directly realize it via use these Spark functions like DataFrameWriter.save, because actually the dataframe writer writes data to HDFS based on Azure Data Lake. The HDFS persists data as a directory named yours and some partition files. Please see some documents about HDFS like The Hadoop FileSystem API Definition to know it.

Then, per my experience, you can try to use Azure Data Lake SDK for Jave within your Scala program to directly write data from DataFrame to Azure Data Lake as a single file. And you can refer to some samples https://github.com/Azure-Samples?utf8=%E2%9C%93&q=data-lake&type=&language=java.

1
votes

The reason why it's creating a directory with multiple files, is because each partition is saved and written to the data lake individually. To save a single output file you need to re partition your dataframe

Let's use the dataframe API

confKey = "fs.azure.account.key.srcAcctName.blob.core.windows.net"
secretKey = "==" #your secret key
spark.conf.set(confKey,secretKey)
blobUrl = 'wasbs://[email protected]'

Coalesce your dataframe

df_join.coalesce(1)
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.mode("overwrite")
.save("blobUrl" + "/reports/")

Change the file name

files = dbutils.fs.ls(blobUrl + '/reports/')
output_file = [x for x in files if x.name.startswith("part-")]
dbutils.fs.mv(output_file[0].path, "%s/reports/testoutput.csv" % (blobUrl))
0
votes

Try this :

df_join.to_csv('/dbfs/mnt/....../df.csv', sep=',', header=True, index=False)