0
votes

I am trying to simply write data to azure sql DataWarehouse, while using azure blob storage for staging.

There is a very straight forward tutorial at azure databricks documentation azure/sql-data-warehouse, which works, if you follow it step by step.

However in my scenario, I have to do the writing from a worker that is executing a foreach.

Here some links related to the issue:

error-using-pyspark-with-wasb-connecting-pyspark-with-azure-blob

github.com/Azure/mmlspark/issues/456

pyspark-java-io-ioexception-no-filesystem-for-scheme-https

So, this code below WORKS:

  spark = SparkSession.builder.getOrCreate()      
  spark.conf.set("fs.azure.account.key.<storageAccountName>.blob.core.windows.net", "myKey")  
  df = spark.createDataFrame([(1, 2, 3, 4), (5, 6, 7, 8)], ('a', 'b', 'c', 'd'))  

  (df.write 
  .format("com.databricks.spark.sqldw") 
  .option("url", "jdbc:sqlserver:...") 
  .option("user", "user@server") 
  .option("password", "pass") 
  .option("forwardSparkAzureStorageCredentials", "true") 
  .option("dbTable", "dbo.table_teste") 
  .option("tempDir", "wasbs://<container>@<storageAccountName>.blob.core.windows.net/") 
  .mode("append")
  .save())

However it fails when I insert the code above inside a foreach, just like below:

from pyspark.sql.session import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.getOrCreate()          

def iterate(row):
   # The code above

dfIter = spark.createDataFrame([(1, 2, 3, 4)], ('a', 'b', 'c', 'd'))
dfIter.rdd.foreach(iterate)

Executing it will generate this exception:

py4j.protocol.Py4JJavaError: An error occurred while calling o54.save. : com.databricks.spark.sqldw.SqlDWConnectorException: Exception encountered in SQL DW connector code.

Caused by: java.io.IOException: No FileSystem for scheme: wasbs

I have had the same kind of issue when saving on delta tables: pyspark-saving-is-not-working-when-called-from-inside-a-foreach

But in that case, I just needed to setup '/dbfs/' at the begining of the delta table location, so the worker would be able to save it in the right place.

Based on that, I believe something is missing in the worker, and that is why it is not properly executing this saving. Maybe a library that I should setup into spark config.

I also looked into databricks community: save-the-results-of-a-query-to-azure-blo and they managed to solve the issue by setting this config:

sc.hadoopConfiguration.set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem")

PySpark:

spark.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem")

But it didn't work and I got this exception:

Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found

org.apache.hadoop:hadoop-azure:3.2.0 is installed.

Well, any help?

1
Pretty sure you also need the azure-storage jarsSage
I also thought it for a while. I tried adding it to the spark session from inside the worker, like that: ...config('spark.jars.packages', 'com.microsoft.azure:azure-storage:5.2.0')... Databricks 5.4 uses azure-storage 5.2. But it failed.Flavio Pegas

1 Answers

0
votes

I believe your main issue is that you are trying to write from within a foreach loop, which basically renders any kind of batching/scaling moot - which is what the SQL DW connector is designed for. If you really need to write out from within the loop and the data volume is not too huge you might be able to achieve this using the simple JDBC connector: https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

But still note that SQL DW is really optimized for large scale write, not for single row ingestion.