4
votes

Dataproc cluster is create with image 2.0.x with delta io package io.delta:delta-core_2.12:0.7.0

Spark version is 3.1.1

Spark shell initiated with :

pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

Command executed to create delta table and insert into delta sql's:

spark.sql("""CREATE TABLE IF NOT EXISTS customer(
             c_id Long, c_name String, c_city String
             )
           USING DELTA LOCATION 'gs://edw-bi-dev-dataexports/delta-table-poc/dt_poc/customer'
         """)

spark.sql("INSERT INTO customer VALUES(1, 'Shawn', 'Tx')")

Error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/session.py", line 719, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o58.sql.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias.<init>(Lorg/apache/spark/sql/catalyst/expressions/Expression;Ljava/lang/String;Lorg/apache/spark/sql/catalyst/expressions/ExprId;Lscala/collection/Seq;Lscala/Option;)V
        at org.apache.spark.sql.delta.DeltaAnalysis.$anonfun$normalizeQueryColumns$1(DeltaAnalysis.scala:162)
        at scala.collection.immutable.List.map(List.scala:293)
        at org.apache.spark.sql.delta.DeltaAnalysis.org$apache$spark$sql$delta$DeltaAnalysis$$normalizeQueryColumns(DeltaAnalysis.scala:151)
        at org.apache.spark.sql.delta.DeltaAnalysis$$anonfun$apply$1.applyOrElse(DeltaAnalysis.scala:49)
        at org.apache.spark.sql.delta.DeltaAnalysis$$anonfun$apply$1.applyOrElse(DeltaAnalysis.scala:45)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:221)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.delta.DeltaAnalysis.apply(DeltaAnalysis.scala:45)
        at org.apache.spark.sql.delta.DeltaAnalysis.apply(DeltaAnalysis.scala:40)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:195)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:189)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:154)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:173)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:228)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:172)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:73)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:73)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:71)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:63)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
        at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)

I am not able to figure out the root cause for the problem here.

2

2 Answers

6
votes

It's caused by this change that broke the binary compatibility for the Alias case class. The fix for that either downgrade the Spark version to 3.0.x, or wait until new Delta version is released with support for 3.1.x.

P.S. There are other places in Delta that were broken by changes in the Spark 3.1.1

Update (May 2021) Version 1.0.0 now is fully compatible with Spark 3.1

0
votes

It seems that the issue is related to the setting of the spark sql catalog. I tried the same query and it works when I remove this conf :

--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

Tested on dataproc image 2.0.1-debian10 with :

pyspark --packages io.delta:delta-core_2.12:0.7.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"