0
votes

I am saving a spark dataframe to S3 (csv file) using the following code

import traceback

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Attached the spark submit command used
# spark-submit --master local[1] --packages org.apache.hadoop:hadoop-aws:2.7.3,
# com.amazonaws:aws-java-sdk-s3:1.11.98 my_file.py

ACCESS_KEY_ID = "xxxxxxxxxx"
SECRET_ACCESS_KEY = "yyyyyyyyyyyyy"
BUCKET_NAME = "zzzz"

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL data source example") \
    .getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
df.show()

try:
    spark.conf.set("fs.s3n.awsAccessKeyId", ACCESS_KEY_ID)
    spark.conf.set("fs.s3n.awsSecretAccessKey", SECRET_ACCESS_KEY)
    spark.conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

    output_directory = 's3n://' + BUCKET_NAME + '/' + str("azure_dbs")
    df.write.save(output_directory + '_csv', format='csv', header=True, mode="overwrite")
    print("Written successful")
except Exception as exp:
    print("Exception occurred")
    print(exp)
    print(traceback.format_exc())

When i run it from my local system, it writes to S3 successfully (using spark-submit). The spark-submit command used is

spark-submit --master local[1] --packages org.apache.hadoop:hadoop-aws:2.7.3, com.amazonaws:aws-java-sdk-s3:1.11.98 my_file.py

But when i run this as a job from azure databricks notebook with those packages as added dependencies to the job, i am getting the following error.


    py4j.protocol.Py4JJavaError: An error occurred while calling o252.save.
    : java.lang.VerifyError: Bad type on operand stack
    Exception Details:
      Location:
        org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V @152: invokevirtual
      Reason:
        Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not assignable to 'org/jets3t/service/model/StorageObject'
      Current Frame:
        bci: @152
        flags: { }
        locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
        stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', integer }
      Bytecode:
        0x0000000: b200 41b9 0067 0100 9900 36b2 0041 bb00
        0x0000010: 5959 b700 5a12 68b6 005b 2bb6 005b 1269
        0x0000020: b600 5b2c b600 5b12 6ab6 005b 2ab4 0023
        0x0000030: b600 3cb6 005b b600 5cb9 006b 0200 2ab4
        0x0000040: 0011 9900 302a b400 0c2a b400 232b 0101
        0x0000050: 0101 b600 6c4e 2ab4 001c 0994 9e00 162d
        0x0000060: b600 6d2a b400 1c94 9e00 0a2a 2d2c b600
        0x0000070: 6eb1 bb00 2a59 2cb7 002b 4e2d 2ab4 001f
        0x0000080: b600 302a b400 0c2a b400 23b6 003c 2b2a
        0x0000090: b400 23b6 003c 2d03 b600 6f57 a700 0a4e
        0x00000a0: 2a2d 2bb7 0035 b1                      
      Exception Handler Table:
        bci [0, 113] => handler: 159
        bci [114, 156] => handler: 159
      Stackmap Table:
        same_frame(@62)
        same_frame(@114)
        same_locals_1_stack_item_frame(@159,Object[#216])
        same_frame(@166)

        at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:342)
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:332)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
        at com.databricks.sql.transaction.tahoe.DeltaTableUtils$.findDeltaTableRoot(DeltaTable.scala:103)
        at com.databricks.sql.transaction.tahoe.DeltaValidation$.validateNonDeltaWrite(DeltaValidation.scala:94)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:261)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
        at py4j.Gateway.invoke(Gateway.java:295)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:251)
        at java.lang.Thread.run(Thread.java:748)

(While running this as notebook job from azure databricks, i am not creating a new spark object as in local machine scenario, instead am using the existing spark provided by the databricks.)

What is the cause of the error. Do we need to have any additional package when we run this from azure databricks?

Included Spark submit packages:

  • org.apache.hadoop:hadoop-aws:2.7.3,
  • com.amazonaws:aws-java-sdk-s3:1.11.98

Local machine:
Python 3.6
Spark version 2.4.4 Using Scala version 2.11.12

Databricks details:
Cluster information:
5.5 LTS (includes Apache Spark 2.4.3, Scala 2.11)
Python 3 (3.5)

1

1 Answers

0
votes

In Azure databricks, it seems like we need to update the key used in setting the configuration. Refer the answer given by Carlos David Peña.


We need to use the key "spark.hadoop.fs.s3n.impl" instead of "fs.s3n.impl".

NOTE: There is no need to explicitly add any Dependent Libraries to the job.(azure databricks)