1
votes

I'm trying to write as Spark DF as a DeltaTable. It's working fine in my IDE Intelliji , But with the same dependencies and versions it's not working in my spark REPL(Spark shell)

Spark Version :2.4.0 Scala Version :2.11.8

Dependencies in Intelliji (Dependencies for whole project , Kindly ignore relevant)

    compile 'org.scala-lang:scala-library:2.11.8'
    compile 'org.scala-lang:scala-reflect:2.11.8'
    compile 'org.scala-lang:scala-compiler:2.11.8'
    compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.1.2'
    compile 'org.scala-lang.modules:scala-swing_2.11:2.0.3'
    compile 'org.apache.spark:spark-mllib_2.11:2.4.0'
    compile 'org.apache.spark:spark-sql_2.11:2.4.0'
    compile 'org.apache.spark:spark-graphx_2.11:2.4.0'
    compile 'org.apache.spark:spark-launcher_2.11:2.4.0'
    compile 'org.apache.spark:spark-catalyst_2.11:2.4.0'
    compile 'org.apache.spark:spark-streaming_2.11:2.4.0'
    compile group: 'io.delta', name: 'delta-core_2.11', version: '0.5.0'
    compile 'org.apache.spark:spark-core_2.11:2.4.0'
    compile 'org.apache.spark:spark-hive_2.11:2.4.0'
    compile 'com.databricks:spark-avro_2.11:4.0.0'
    compile 'org.apache.avro:avro-mapred:1.8.2'
    compile 'org.apache.avro:avro:1.8.2'
    compile 'org.apache.avro:avro-compiler:1.8.2'
    compile group: 'mysql', name: 'mysql-connector-java', version: '8.0.15'
    compile group: 'commons-io', name: 'commons-io', version: '2.5'
    testCompile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.26'
    testCompile group: 'junit', name: 'junit', version: '4.12'
    testCompile group: 'org.scalatest', name: 'scalatest_2.12', version: '3.2.0-SNAP10'
    compile group: 'javax.mail', name: 'javax.mail-api', version: '1.6.2'
    compile group: 'com.sun.mail' ,name: 'javax.mail', version: '1.6.0'
    compile 'com.hortonworks:shc-core:1.1.1-2.1-s_2.11'
    compile 'com.hortonworks:shc:1.1.1-2.1-s_2.11'
    compile group: 'org.apache.hbase', name: 'hbase-client', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase-server', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase-common', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase', version: '1.2.5', ext: 'pom'
    compile group: 'org.apache.hbase', name: 'hbase-protocol', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase-hadoop2-compat', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase-annotations', version: '1.2.5'

    // jackson modues
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.8.6'
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.10.0'
    compile group: 'org.codehaus.jackson', name: 'jackson-core-asl', version: '1.9.13'
    compile group: 'org.codehaus.jackson', name: 'jackson-mapper-asl', version: '1.9.13'
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.8.7'
    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.8.6'
    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.8.6'
    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jaxb-annotations', version: '2.8.6'
    compile group: 'org.json4s', name: 'json4s-jackson_2.11', version: '3.2.10'
    compile group: 'com.twitter', name: 'parquet-jackson', version: '1.6.0'
    compile group: 'org.codehaus.jackson', name: 'jackson-jaxrs', version: '1.9.13'
    compile group: 'org.codehaus.jackson', name: 'jackson-xc', version: '1.9.13'
    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-paranamer', version: '2.8.6'
    compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-annotations', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-auth', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-yarn-common', version: '2.7.3'

Piece of code which i'm trying to execute

import io.delta._

val dF=spark.read.load("path") //parquet file
dF.write.format("delta").mode("overwrite").partitionBy("topic","partition","key").save("path") // delta table

spark-shell Command used:

spark-shell --packages com.fasterxml.jackson.core:jackson-databind:2.8.6,com.fasterxml.jackson.core:jackson-core:2.10.0,org.codehaus.jackson:jackson-core-asl:1.9.13,org.codehaus.jackson:jackson-mapper-asl:1.9.13,com.fasterxml.jackson.core:jackson-annotations:2.8.7,com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.8.6,com.fasterxml.jackson.module:jackson-module-scala_2.11:2.8.6,com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.8.6,com.twitter:parquet-jackson:1.6.0,org.codehaus.jackson:jackson-jaxrs:1.9.13,org.codehaus.jackson:jackson-xc:1.9.13,com.fasterxml.jackson.module:jackson-module-paranamer:2.8.6,io.delta:delta-core_2.11:0.5.0,commons-io:commons-io:2.5

Error in REPL:

Exception in thread "main" java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse$default$3()Z
    at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:127)
    at org.apache.spark.sql.delta.actions.Metadata$$anonfun$schema$1.apply(actions.scala:202)
    at org.apache.spark.sql.delta.actions.Metadata$$anonfun$schema$1.apply(actions.scala:201)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.sql.delta.actions.Metadata.schema$lzycompute(actions.scala:201)
    at org.apache.spark.sql.delta.actions.Metadata.schema(actions.scala:200)
    at org.apache.spark.sql.delta.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:61)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:45)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:85)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:65)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:396)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:133)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
    at org.controller.deltaLakeEG.deltaLakeHadoopEg$.main(deltaLakeHadoopEg.scala:29)
    at org.controller.deltaLakeEG.deltaLakeHadoopEg.main(deltaLakeHadoopEg.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

4

4 Answers

4
votes

As per the official documentation:

Delta Lake requires Apache Spark version 2.4.2 or above

Please upgrade your Spark version to at least 2.4.2 in IntelliJ IDEA (or issues show up). The latest version as of this writing is 3.1.1, but that's not supported yet (April, 7th):

We have upgraded Spark to 3.1.1 in master branch. We are still working on some items before doing a release.

As per the official documentation:

Run spark-shell with the Delta Lake package:

bin/spark-shell --packages io.delta:delta-core_2.11:0.8.0

From myself, use --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension to enable Delta Lake's SQL commands, e.g. DESCRIBE DETAIL, GENERATE.

The entire command to run spark-shell with Delta Lake 0.8.0 should be as follows:

./bin/spark-shell \
  --packages io.delta:delta-core_2.12:0.8.0 \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
1
votes
bin/spark-shell --packages io.delta:delta-core_2.11:0.6.1

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath("/tmp/delta-table")
0
votes

Spark itself has a dependency on Jackson, and the version you're instructing spark-shell to use is incompatible. Per https://github.com/apache/spark/blob/v2.4.0/pom.xml, 2.4.0 uses Jackson 2.6.7. Is there a particular reason that you need Jackson 2.10 in this case?

0
votes

Please check here: https://docs.delta.io/0.8.0/quick-start.html

To run the spark-shell use the command

spark-shell --packages io.delta:delta-core_2.12:0.8.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"