I am trying to simply write data to azure sql DataWarehouse, while using azure blob storage for staging.
There is a very straight forward tutorial at azure databricks documentation azure/sql-data-warehouse, which works, if you follow it step by step.
However in my scenario, I have to do the writing from a worker that is executing a foreach.
Here some links related to the issue:
error-using-pyspark-with-wasb-connecting-pyspark-with-azure-blob
github.com/Azure/mmlspark/issues/456
pyspark-java-io-ioexception-no-filesystem-for-scheme-https
So, this code below WORKS:
spark = SparkSession.builder.getOrCreate()
spark.conf.set("fs.azure.account.key.<storageAccountName>.blob.core.windows.net", "myKey")
df = spark.createDataFrame([(1, 2, 3, 4), (5, 6, 7, 8)], ('a', 'b', 'c', 'd'))
(df.write
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver:...")
.option("user", "user@server")
.option("password", "pass")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "dbo.table_teste")
.option("tempDir", "wasbs://<container>@<storageAccountName>.blob.core.windows.net/")
.mode("append")
.save())
However it fails when I insert the code above inside a foreach, just like below:
from pyspark.sql.session import SparkSession
from pyspark.sql import Row
spark = SparkSession.builder.getOrCreate()
def iterate(row):
# The code above
dfIter = spark.createDataFrame([(1, 2, 3, 4)], ('a', 'b', 'c', 'd'))
dfIter.rdd.foreach(iterate)
Executing it will generate this exception:
py4j.protocol.Py4JJavaError: An error occurred while calling o54.save. : com.databricks.spark.sqldw.SqlDWConnectorException: Exception encountered in SQL DW connector code.
Caused by: java.io.IOException: No FileSystem for scheme: wasbs
I have had the same kind of issue when saving on delta tables: pyspark-saving-is-not-working-when-called-from-inside-a-foreach
But in that case, I just needed to setup '/dbfs/' at the begining of the delta table location, so the worker would be able to save it in the right place.
Based on that, I believe something is missing in the worker, and that is why it is not properly executing this saving. Maybe a library that I should setup into spark config.
I also looked into databricks community: save-the-results-of-a-query-to-azure-blo and they managed to solve the issue by setting this config:
sc.hadoopConfiguration.set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem")
PySpark:
spark.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem")
But it didn't work and I got this exception:
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found
org.apache.hadoop:hadoop-azure:3.2.0 is installed.
Well, any help?