1
votes

I have a 3-node spark-cluster. And tries to access snowflake using snowflake spark connector and jdbc driver

jdbc: snowflake-jdbc-3.12.4.jar spark-connector: spark-snowflake_2.11-2.7.0-spark_2.4.jar

Here is my code:

sfOptions = {
  "sfURL" : "{}.snowflakecomputing.com".format(ACCOUNT_NAME),
  "sfUser" : "{}@fmr.com".format(USER_ID),
  "sfAccount" : "{}".format(ACCOUNT_ID),
  "sfRole" : "{}".format(DEFAULT_ROLE),
  "sfAuthenticator" : "oauth",
  "sfToken" : "{}".format(oauth_token),
  "sfDatabase" : "{}".format(DATABASE),
  "sfSchema" : "{}".format(SCHEMA),
  "sfWarehouse" : "{}".format(WAREHOUSE)
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
....
conf = (SparkConf()
    .setMaster("spark://<master-url>")
    .setAppName("Spark-Snowflake-Connector")
    )


spark = (SparkSession.builder.config(conf=conf)
    .enableHiveSupport()
    .getOrCreate())
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())


sdf = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
   .options(**sfOptions) \
   .option("query",  "select * from TIME_AGE") \
   .load()
sdf.show()

My call failed on sdf.show() with the following exception. Any suggestions?

20/04/26 09:54:55 INFO DAGScheduler: Job 0 failed: showString at NativeMethodAccessorImpl.java:0, took 5.494100 s Traceback (most recent call last): File "/fedata/a393831/snowflake/spark-driver.py", line 114, in sdf.show() File "/apps/shared/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show File "/apps/shared/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call File "/apps/shared/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/apps/shared/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o68.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.240.62.46, executor 0): java.lang.NullPointerException at net.snowflake.client.core.SFArrowResultSet.getObject(SFArrowResultSet.java:570) at net.snowflake.client.jdbc.SnowflakeResultSetV1.getObject(SnowflakeResultSetV1.java:336) at net.snowflake.spark.snowflake.io.ResultIterator$$anonfun$2.apply(SnowflakeResultSetRDD.scala:115) at net.snowflake.spark.snowflake.io.ResultIterator$$anonfun$2.apply(SnowflakeResultSetRDD.scala:114) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at net.snowflake.spark.snowflake.io.ResultIterator.next(SnowflakeResultSetRDD.scala:114) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:256) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)

2
how big is TIME_AGE? Can you try a subset of the table?abiratsis
It only has about 400 rows and 5 columns.Ray Zhang
are you using YARN for your Spark App.?SunnyAk

2 Answers

1
votes

Looks like there is an issue with Snowflake JDBC 3.12.4 jar version while using with Spark Connector spark-snowflake_2.11-2.7.0-spark_2.4.jar.Can you try with 3.12.3 version of Snowflake JDBC driver . That works well with the above Spark Connector version.

0
votes

I had the same question with same connector and driver config. My query was simply counting number of rows on the SF sample table - snowflake_sample_data.tpch_sf1.lineitem.

"sfDatabase" -> "snowflake_sample_data",
"sfSchema" -> "tpch_sf1",
"query" -> "select count(*) from lineitem"

So I just tried jdbc driver with version 3.12.0 and it works. So it seems a regression in the driver new release.