0
votes

I have created a simple job using notebook in azure databricks. I am trying to save a spark dataframe from notebook to azure blob storage. Attaching the sample code

import traceback

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Attached the spark submit command used
# spark-submit --master local[1] --packages org.apache.hadoop:hadoop-azure:2.7.2,
# com.microsoft.azure:azure-storage:3.1.0 ./write_to_blob_from_spark.py

# Tried with com.microsoft.azure:azure-storage:2.2.0

SECRET_ACCESS_KEY = "xxxxx"
STORAGE_NAME = "my_storage"
CONTAINER = "my_container"
SUB_PATH = "/azure_dbs_check/"
FILE_NAME = "result"

spark = SparkSession \
    .builder \
    .appName("azure_dbs_to_azure_blob") \
    .getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
df.show()

try:
    spark_context = spark.sparkContext
    fs_acc_key = "fs.azure.account.key." + STORAGE_NAME + ".blob.core.windows.net"

    spark.conf.set("fs.wasbs.impl",
                   "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
    spark.conf.set(fs_acc_key, SECRET_ACCESS_KEY)

    file_path = 'wasbs://' + CONTAINER + '@' + STORAGE_NAME + '.blob.core.windows.net' + SUB_PATH + FILE_NAME

    df.write.save(file_path + '_csv', format='csv', header=True, mode="overwrite")
    print("Written successful")
except Exception as exp:
    print("Exception occurred")
    print(traceback.format_exc())

The above code works when i run the spark-submit in local machine. The spark submit command used is

spark-submit --master local[1] --packages org.apache.hadoop:hadoop-azure:2.7.2,com.microsoft.azure:azure-storage:3.1.0 ./write_to_blob_from_spark.py

The probable root cause might be

Caused by: java.lang.NoSuchMethodError:
com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob

So i downgraded the package to com.microsoft.azure:azure-storage:2.2.0 which contains the startCopyFromBlob method.
(In com.microsoft.azure:azure-storage:3.x.x versions,this deprecated startCopyFromBlob() on CloudBlob is removed)

Error remains the same even after downgrading process.

Attaching the error stack trace,

    Traceback (most recent call last):
      File "<command-4281470986294005>", line 28, in <module>
        df.write.save(file_path + '_csv', format='csv', header=True, mode="overwrite")
      File "/databricks/spark/python/pyspark/sql/readwriter.py", line 738, in save
        self._jwrite.save(path)
      File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
        answer, self.gateway_client, self.target_id, self.name)
      File "/databricks/spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
        format(target_id, ".", name), value)
    py4j.protocol.Py4JJavaError: An error occurred while calling o255.save.
    : org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:192)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:110)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:108)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:128)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:116)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:116)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
        at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:306)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:292)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
        at py4j.Gateway.invoke(Gateway.java:295)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:251)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 52, 10.2.3.12, executor 0): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
        at org.apache.spark.scheduler.Task.run(Task.scala:112)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.IllegalStateException: Error closing the output.
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:880)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.close(UnivocityGenerator.scala:85)
        at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CSVFileFormat.scala:193)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 11 more
    Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
        at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
        at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
        at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
        ... 19 more

    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1096)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2243)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
        ... 33 more
    Caused by: org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
        at org.apache.spark.scheduler.Task.run(Task.scala:112)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
    Caused by: java.lang.IllegalStateException: Error closing the output.
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:880)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.close(UnivocityGenerator.scala:85)
        at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CSVFileFormat.scala:193)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 11 more
    Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
        at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
        at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
        at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
        ... 19 more

Included Spark submit packages:

  • org.apache.hadoop:hadoop-azure:2.7.2,
  • com.microsoft.azure:azure-storage:3.1.0 (later tried with com.microsoft.azure:azure-storage:2.2.0)

Local machine:
Python 3.6
Spark version 2.4.4 Using Scala version 2.11.12

Databricks details:
Cluster information:
5.5 LTS (includes Apache Spark 2.4.3, Scala 2.11)
Python 3 (3.5)

The Runtime 5.5 release notes says that the package com.microsoft.azure azure-storage 5.2.0 is already installed in the environment.

Is the problem due to spark taking the library from environment(5.2.0 version) even though another version(2.2.0) is specified in job? In versions like 5.2.0,the method startCopyFromBlob() is removed.

I have documented the various cases/combinations of jars that i have tried in the google doc

Observations:

  1. Databricks Job uses pre-installed library azure-storage:5.2.0. This package does not have com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob() method. (replaced by startCopy() in 4.x.x versions). azure-storage is fixed as 5.2.0

  2. So i tried to use latest hadoop-azure:3.2.1 in an attempt to get jar which does not call the deprecated method. But this caused a new error
    java.lang.NoClassDefFoundError: org/apache/hadoop/fs/StreamCapabilities .

  3. StreamCapabilities class is present in hadoop-common packages. So i included latest hadoop-common (3.2.1).
    This caused java.lang.NoSuchMethodError: org.apache.hadoop.security.ProviderUtils.excludeIncompatibleCredentialProviders().
    Reason:
    org.apache.hadoop:hadoop-common:2.7.3 is pre-installed on azure run time. This hadoop-common:2.7.3 does not have ProviderUtils.excludeIncompatibleCredentialProviders() method.

  4. Since both packages(hadoop-common:2.7.3 & azure-storage:5.2.0) are fixed(pre-installed), i tried to use the lower hadoop-azure packages in an attempt to find version which does not call excludeIncompatibleCredentialProviders() method.

  5. From hadoop-azure:3.2.1(latest as of Nov 2019) to hadoop-azure:2.8.0, excludeIncompatibleCredentialProviders() is called inside.
    Below 2.8.0, am started to get the old error
    NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob

1

1 Answers

3
votes

One alternative is to create a mount:

https://docs.databricks.com/data/data-sources/azure/azure-storage.html

And then adjust the save path as necessary.


Also I'd recommend using this

spark.conf.set(
  "fs.azure.account.key.<storage-account-name>.blob.core.windows.net",
  "<storage-account-access-key>")

Instead of

spark_context._jsc.hadoopConfiguration().set(fs_acc_key, SECRET_ACCESS_KEY)

Since you are using the DataFrame api instead of the RDD api.


Edit

Ran the following code in a Databricks Community cluster and modified the spark.conf.set statements.

import traceback

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Attached the spark submit command used
# spark-submit --master local[1] --packages org.apache.hadoop:hadoop-azure:2.7.2,
# com.microsoft.azure:azure-storage:3.1.0 ./write_to_blob_from_spark.py

# Tried with com.microsoft.azure:azure-storage:2.2.0

SECRET_ACCESS_KEY = "ACCESSKEY"
STORAGE_NAME = "ACCOUNTNAME"
CONTAINER = "CONTAINER"
SUB_PATH = "/azure_dbs_check/"
FILE_NAME = "result"

spark = SparkSession \
    .builder \
    .appName("azure_dbs_to_azure_blob") \
    .getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
df.show()

try:
    fs_acc_key = "fs.azure.account.key." + STORAGE_NAME + ".blob.core.windows.net"

    spark.conf.set("spark.hadoop.fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
    spark.conf.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
    spark.conf.set(fs_acc_key, SECRET_ACCESS_KEY)

    file_path = 'wasbs://' + CONTAINER + '@' + STORAGE_NAME + '.blob.core.windows.net' + SUB_PATH + FILE_NAME

    print(file_path)

    df.write.save(file_path + '_csv', format='csv', header=True, mode="overwrite")

    print("Written successful")
except Exception as exp:
    print("Exception occurred")
    print(traceback.format_exc())

Databricks cmd output

Azure Storage Account