0
votes

I'm trying to store a Spark DataFrame as a CSV on Azure Blob Storage from a local Spark cluster

First, I set the config with the Azure Account/Account Key (I'm not sure what is the proper config so I've set all those)

sparkContext.getConf.set(s"fs.azure.account.key.${account}.blob.core.windows.net", accountKey)

sparkContext.hadoopConfiguration.set(s"fs.azure.account.key.${account}.dfs.core.windows.net", accountKey)
    sparkContext.hadoopConfiguration.set(s"fs.azure.account.key.${account}.blob.core.windows.net", accountKey)

Then I try to store the CSV with the following

filePath = s"wasbs://${container}@${account}.blob.core.windows.net/${prefix}/${filename}"
dataFrame.coalesce(1)
  .write.format("csv")
  .options(Map(
    "header" -> (if (hasHeader) "true" else "false"),
    "sep" -> delimiter,
    "quote" -> quote
  ))
  .save(filePath)

But then this fails with Job aborted and the following stack trace

org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)

But when I look in the blob container, I can see my file however I cannot read it back in a Spark DataFrame, I get this error Unable to infer schema for CSV. It must be specified manually.; and following stack trace

org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:185)
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:185)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:184)
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

It seems that the problem was already reported on Databricks forum!!

What is the proper way to store a DataFrame on Azure Blob?

1

1 Answers

0
votes

It turns out way before the job fails there was an internal error

Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
    at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
    at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
    at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
    at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
    ... 18 more

What's happening is after creating a temp file with the actual data, it's trying to move the file to the location given by the user using CloudBlob.startCopyFromBlob. Like always, microsft people broke this by renaming this method to CloudBlob.startCopy.

I'm using "org.apache.hadoop" % "hadoop-azure" % "3.2.1" which is most recent for "hadoop-azure" and it seems to have stuck with the older startCopyFromBlob, so I need to use an old azure-storage version that has this method, probably 2.x.x.

See https://github.com/Azure/azure-storage-java/issues/113