I am migrating a proof of concept from AWS / EMR to Azure.
It’s written in python and uses Spark, Hadoop and Cassandra on AWS EMR and S3. It calculates Potential Forward Exposure for a small set of OTC derivatives.
I have one roadblock at present: How do I save a pyspark dataframe to Azure storage?
In AWS / S3 this is quite simple, however I’ve yet to make it work on Azure. I may be doing something stupid!
I've tested out writing files to blob and file storage on Azure, but have yet to find pointers to dataframes.
On AWS, I currently use the following:
npv_dataframe.coalesce(1).saveAsTextFile(output_dir + '/exposure_scenarios/' + str(counterparty))
where output_dir is in the format s3://s3_bucket_name/directory_name
I set up a Data Lake Storage Gen2 storage account and container. I have enabled public access to it.
I have explored various methods e.g:
- https://docs.microsoft.com/en-us/python/api/overview/azure/storage-blob-readme?view=azure-python
- https://docs.microsoft.com/en-us/azure/storage/common/storage-samples-python?toc=/azure/storage/blobs/toc.json
- https://docs.databricks.com/_static/notebooks/data-import/azure-blob-store.html
- Write data from pyspark to azure blob? (I believe this is old and that hadoop 3.2.1 comes with abfs support)
Some of these examples use a file-upload pattern but what I wanted was a direct save from a pyspark dataframe.
The test code I used was:
import traceback
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
try:
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
spark.conf.set('fs.azure.account.key.#myaccount#.blob.core.windows.net', '#mykey#')
df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
df.show()
df \
.coalesce(1) \
.write.format('csv') \
.option('header', True) \
.mode('overwrite') \
.save('wasbs://#mycontainer#@#myaccount#.blob.core.windows.net/result_csv')
print("Hadoop version: " + spark.sparkContext._gateway.jvm.org.apache.hadoop.util.VersionInfo.getVersion())
except Exception as exp:
print("Exception occurred")
print(traceback.format_exc())
The example above fails at the df.write
- the error is
py4j.protocol.Py4JJavaError: An error occurred while calling o48.save.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found
I receive the same error when using spark-submit
spark-submit --packages org.apache.hadoop:hadoop-azure:3.2.1,com.microsoft.azure:azure-storage:8.6.3 ./test.py
I believe this may be a version compatibility problem. I noticed that the hadoop.jars in pyspark were all version 2.7.4, whereas I was referencing the 3.2.1 installation.
I am / was using:
Java 8 (1.8.0_265) Spark 3.0.0 Hadoop 3.2.1 Python 3.6 Ubuntu 18.04
I ensured all hadoop jars in the Spark directory were the same as in the Hadoop jar directory.
After following another stack trace error I updated the command to: spark-submit --packages org.apache.hadoop:hadoop-azure:3.2.1,com.microsoft.azure:azure-storage:8.6.5 test.py
I then received a different Java error, which looks like a problem with the key??
py4j.protocol.Py4JJavaError: An error occurred while calling o48.save.
: java.lang.NoSuchMethodError: 'org.apache.hadoop.conf.Configuration org.apache.hadoop.security.ProviderUtils.excludeIncompatibleCredentialProviders(org.apache.hadoop.conf.Configuration, java.lang.Class)'
at org.apache.hadoop.fs.azure.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:45)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.getAccountKeyFromConfiguration(AzureNativeFileSystemStore.java:989)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1078)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:543)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1344)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:832)
Also, after adding the Azure account secure key to the hadoop config, if I try:
hdfs dfs -ls wasbs://[email protected]/
I receive the error: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
Any help appreciated! Bit stuck for ideas. It also seems that, relative to AWS, there are few solved posts about Azure storage / Dataframe issues.
wasbs://<container-name>@<storage-account-name>.blob.core.windows.net/<directory-name>
. – Jim Xusessio.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
to py file then usespark-submit
command run again. – Jim Xu