1
votes

I am trying to load data from s3 buckets csv files into snowflake using glue ETL. Wrote a python script within the ETL job for the same as below:

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from py4j.java_gateway import java_import
    SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

    ## @params: [JOB_NAME, URL, ACCOUNT, WAREHOUSE, DB, SCHEMA, USERNAME, PASSWORD]
    args = getResolvedOptions(sys.argv, ['JOB_NAME', 'URL', 'ACCOUNT', 'WAREHOUSE', 'DB', 'SCHEMA', 
    'USERNAME', 'PASSWORD'])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    java_import(spark._jvm, "net.snowflake.spark.snowflake")


    spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession 
     (spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
     sfOptions = {
     "sfURL" : args['URL'],
     "sfAccount" : args['ACCOUNT'],
     "sfUser" : args['USERNAME'],
     "sfPassword" : args['PASSWORD'],
     "sfDatabase" : args['DB'],
     "sfSchema" : args['SCHEMA'],
     "sfWarehouse" : args['WAREHOUSE'],
      }

     dyf = glueContext.create_dynamic_frame.from_catalog(database = "salesforcedb", table_name = 
     "pr_summary_csv", transformation_ctx = "dyf")
     df=dyf.toDF()
     ##df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("parallelism", 
     "8").option("dbtable", "abcdef").mode("overwrite").save()
     df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "abcdef").save()
     job.commit()

The error thrown is:

error occurred while calling o81.save. Incorrect username or password was specified.

However if I don't convert to Spark data frame, and use directly the dynamic frame I get error like this:

AttributeError: 'function' object has no attribute 'format'

Could someone please look over my code and tell me what I'm doing wrong for converting a dynamic frame to DF? Please let me know If I need to provide more information.

BTW , I am newbie to snowflake and this is my trial on loading data through AWS Glue. ????

2
Did you make a bucket with the Snowflake driver available for Glue so glue can install the driver? There is a great guide available here: snowflake.com/blog/how-to-use-aws-glue-with-snowflakeBlokje5
Can you try printing user name, password and see if you are reading the arguments properly?Prabhakar Reddy

2 Answers

0
votes

error occurred while calling o81.save. Incorrect username or password was specified.

The error message says that there's an error about the user or the password. If you are sure that the user name and the password are correct, please be sure that the Snowflake account name and URL are also correct.

However if I don't convert to Spark data frame, and use directly the dynamic frame I get error like this:

AttributeError: 'function' object has no attribute 'format'

The Glue DynamicFrame's write method is different than Spark DataFrame, so it's normal to not to have same methods. Please check the documentation:

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-write

It seems you need to give the parameters as connection_options:

write(connection_type, connection_options, format, format_options, accumulator_size)

connection_options = {"url": "jdbc-url/database", "user": "username", "password": "password","dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"} 

Even you use the DynamicFrame, you will probably end up with the incorrect username or password error. So I suggest you to focus on fixing the credentials.

0
votes

Here is the tested Glue Code (you can copy paste as it is only change the table name ), which you can use for setting up Glue ETL . You will have to add the JDBC and Spark jars .You can use the below link for set up: https://community.snowflake.com/s/article/How-To-Use-AWS-Glue-With-Snowflake


import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from py4j.java_gateway import java_import
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake";

## @params: [JOB_NAME, URL, ACCOUNT, WAREHOUSE, DB, SCHEMA, USERNAME, PASSWORD]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'URL', 'ACCOUNT', 'WAREHOUSE', 'DB', 'SCHEMA', 'USERNAME', 'PASSWORD'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


## uj = sc._jvm.net.snowflake.spark.snowflake
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
sfOptions = {
"sfURL" : args['URL'],
"sfAccount" : args['ACCOUNT'],
"sfUser" : args['USERNAME'],
"sfPassword" : args['PASSWORD'],
"sfDatabase" : args['DB'],
"sfSchema" : args['SCHEMA'],
"sfWarehouse" : args['WAREHOUSE'],
}

## Read from a Snowflake table into a Spark Data Frame
df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "Select * from <tablename>").load()
df.show()

## Perform any kind of transformations on your data and save as a new Data Frame: df1 = df.[Insert any filter, transformation, or other operation]
## Write the Data Frame contents back to Snowflake in a new table df1.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "[new_table_name]").mode("overwrite").save() job.commit()