I have the current situation:
- Delta table located in
S3 - I want to query this table via
Athena sparkversion3.1.1andhadoop3.2.0
To do this, I need to follow the docs: instructions and s3 setup
I am using a MacBook Pro and with Environment variables configured in my ~/.zshrc for my small little POC:
export PYSPARK_PYTHON=<poetry_python_path>/bin/python3
export PYSPARK_DRIVER=<poetry_python_path>/bin/python3
export JAVA_HOME="/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home"
export SPARK_HOME=<poetry_python_path>/site-packages/pyspark
export PYARROW_IGNORE_TIMEZONE=1
I setup a small pyspark project, where I create my spark_session:
from pyspark.sql import SparkSession
import findspark
import boto3
def create_session() -> SparkSession:
findspark.init()
spark_session = SparkSession.builder.appName("delta_session") \
.master("local[*]") \
.getOrCreate()
sparkContext = spark_session.sparkContext
boto_default_session = boto3.setup_default_session()
boto_session = boto3.Session(
botocore_session=boto_default_session, profile_name="dev", region_name="eu-west-1"
)
credentials = boto_session.get_credentials()
print(
f"Hadoop version = {sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}"
)
hadoopConfiguration = sparkContext._jsc.hadoopConfiguration()
hadoopConfiguration.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.profile.ProfileCredentialsProvider"
)
hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConfiguration.set("fs.s3a.awsAccessKeyId", credentials.access_key)
hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", credentials.secret_key)
hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
return spark_session
I then run:
spark_session = create_session()
from delta.tables import *
delta_table = DeltaTable.forPath(spark_session, "s3a://<my-path-to-delta-table>")
# This works
df = delta_table.toDF()
print(df.show(10))
# This fails
delta_table.generate("symlink_format_manifest")
I am able to retrieve the delta files and create a
DataFrame, all looks good.I then try to call
delta_table.generateand I get this error:
Traceback (most recent call last): File "/run.py", line 33, in delta_table.generate("symlink_format_manifest") File "/private/var/folders/c8/sj3rz_k14cs58nqwr3m9zsxc0000gq/T/spark-ba2ce53e-c9f8-49d4-98d5-21d9581b05f4/userFiles-b6d820f0-4e96-4e27-8808-a14b9e93928a/io.delta_delta-core_2.12-0.7.0.jar/delta/tables.py", line 74, in generate File "<poetry_python_path>/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in call File "<poetry_python_path>/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "<poetry_python_path>/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o34.generate. : java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default$6()Z at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generatePartitionPathExpression$1(GenerateSymlinkManifest.scala:350) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242) at scala.collection.immutable.List.flatMap(List.scala:355) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression(GenerateSymlinkManifest.scala:349) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression$(GenerateSymlinkManifest.scala:345) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generatePartitionPathExpression(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.withRelativePartitionDir(GenerateSymlinkManifest.scala:338) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.writeManifestFiles(GenerateSymlinkManifest.scala:262) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generateFullManifest$1(GenerateSymlinkManifest.scala:180) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.withStatusCode(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$recordManifestGeneration$1(GenerateSymlinkManifest.scala:365) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77) at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordOperation(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordDeltaOperation(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.recordManifestGeneration(GenerateSymlinkManifest.scala:364) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest(GenerateSymlinkManifest.scala:167) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest$(GenerateSymlinkManifest.scala:165) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generateFullManifest(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$1(DeltaGenerateCommand.scala:58) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$1$adapted(DeltaGenerateCommand.scala:58) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand.run(DeltaGenerateCommand.scala:50) at io.delta.tables.execution.DeltaTableOperations.executeGenerate(DeltaTableOperations.scala:54) at io.delta.tables.execution.DeltaTableOperations.executeGenerate$(DeltaTableOperations.scala:48) at io.delta.tables.DeltaTable.executeGenerate(DeltaTable.scala:45) at io.delta.tables.DeltaTable.generate(DeltaTable.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)
I call the application with:
poetry run spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" run.py
What I tried:
- I have tried running it without
poetryand directly downloading spark and doing it that way - I tried with an older
hadoopversion since they seem to use that here - I found this thread but it did not help me
- I have also tried
io.delta:delta-core_2.12:0.8.0 - I have verified that
deltaversion0.7.0and0.8.0should supportspark 3.1.1 - I also tried adding
pyarrowand setting it via:spark_session.conf.set("spark.sql.execution.arrow.enabled", "true") - I have also tried to add hadoop-common 3.2.0
--packages org.apache.hadoop:hadoop-common:3.2.0, but that did not help either - I also tried running it with
spark 3.1.1 and hadoop 3.2.0but I gave it--packages "io.delta:delta-core_2.12:0.7.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:2.7.7"instead, but that gave me the error:
py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
: java.lang.NumberFormatException: For input string: "100M"
It looks to me like the org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default$6()Z is not callable for some reason. And I can't find anything more to install?
My pyproject.toml
[tool.poetry]
name = "..."
version = "1.0.0"
description = "..."
authors = ["..."]
[tool.poetry.dependencies]
python = "3.7.8"
pre-commit = "^2.8.2"
pyspark = {version="3.1.1", optional=true, extras=["sql"]}
findspark = "^1.4.2"
boto3 = "*"
pyarrow = "3.0.0"
[tool.poetry.dev-dependencies]
pytest = "6.1.1"
ipdb = "0.13.3"
pytest-cov = "2.10.1"
Greatful for anyone potentially encountering the same issue.
UPDATE
Based on the comment from Alex, I resolved the issue by:
- Spark version
3.0.2 - Hadoop version
3.2.0 - Delta
0.8.0 spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" ~/code/dataops-delta-infrastructure/run.py
