I have a 3-node spark-cluster. And tries to access snowflake using snowflake spark connector and jdbc driver
jdbc: snowflake-jdbc-3.12.4.jar spark-connector: spark-snowflake_2.11-2.7.0-spark_2.4.jar
Here is my code:
sfOptions = {
"sfURL" : "{}.snowflakecomputing.com".format(ACCOUNT_NAME),
"sfUser" : "{}@fmr.com".format(USER_ID),
"sfAccount" : "{}".format(ACCOUNT_ID),
"sfRole" : "{}".format(DEFAULT_ROLE),
"sfAuthenticator" : "oauth",
"sfToken" : "{}".format(oauth_token),
"sfDatabase" : "{}".format(DATABASE),
"sfSchema" : "{}".format(SCHEMA),
"sfWarehouse" : "{}".format(WAREHOUSE)
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
....
conf = (SparkConf()
.setMaster("spark://<master-url>")
.setAppName("Spark-Snowflake-Connector")
)
spark = (SparkSession.builder.config(conf=conf)
.enableHiveSupport()
.getOrCreate())
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
sdf = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "select * from TIME_AGE") \
.load()
sdf.show()
My call failed on sdf.show() with the following exception. Any suggestions?
20/04/26 09:54:55 INFO DAGScheduler: Job 0 failed: showString at NativeMethodAccessorImpl.java:0, took 5.494100 s Traceback (most recent call last): File "/fedata/a393831/snowflake/spark-driver.py", line 114, in sdf.show() File "/apps/shared/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show File "/apps/shared/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call File "/apps/shared/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/apps/shared/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o68.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.240.62.46, executor 0): java.lang.NullPointerException at net.snowflake.client.core.SFArrowResultSet.getObject(SFArrowResultSet.java:570) at net.snowflake.client.jdbc.SnowflakeResultSetV1.getObject(SnowflakeResultSetV1.java:336) at net.snowflake.spark.snowflake.io.ResultIterator$$anonfun$2.apply(SnowflakeResultSetRDD.scala:115) at net.snowflake.spark.snowflake.io.ResultIterator$$anonfun$2.apply(SnowflakeResultSetRDD.scala:114) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at net.snowflake.spark.snowflake.io.ResultIterator.next(SnowflakeResultSetRDD.scala:114) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:256) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)