1
votes

I am trying to run an ETL job on glue where I am extracting data into a spark dataframe from a mongodb into glue and load it into snowflake.

This is the sample schema of the Spark dataframe

|-- login: struct (nullable = true)
 |    |-- login_attempts: integer (nullable = true)
 |    |-- last_attempt: timestamp (nullable = true)
 |-- name: string (nullable = true)
 |-- notifications: struct (nullable = true)
 |    |-- bot_review_queue: boolean (nullable = true)
 |    |-- bot_review_queue_web_push: boolean (nullable = true)
 |    |-- bot_review_queue_web_push_admin: boolean (nullable = true)
 |    |-- weekly_account_summary: struct (nullable = true)
 |    |    |-- enabled: boolean (nullable = true)
 |    |-- weekly_summary: struct (nullable = true)
 |    |    |-- enabled: boolean (nullable = true)
 |    |    |-- day: integer (nullable = true)
 |    |    |-- hour: integer (nullable = true)
 |    |    |-- minute: integer (nullable = true)
 |-- query: struct (nullable = true)
 |    |-- email_address: string (nullable = true)

I am trying to load the data into snowflake as it is and struct columns as json payload in snowflake but it throws the following error

An error occurred while calling o81.collectToPython.com.mongodb.spark.exceptions.MongoTypeConversionException:Cannot cast ARRAY into a StructType

I also tried to cast the struct columns into string and load it but it throws more or less the same error

An error occurred while calling o106.save.  com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a StructType

Really appreciate if I can get some help on it.

code below for casting and loading.

dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",
                                                  connection_options=read_mongo_options)
user_df_cast = user_df.select(user_df.login.cast(StringType()),'name',user_df.notifications.cast(StringType()))
datasinkusers = user_df_cast.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "users").mode("append").save()
1
That error makes it look like Snowflake isn't even seeing the command, can you check your query history to confirm? Also, it might make sense to do a debug command after you set user_df_cast, maybe check the "shape" print((df.count(), len(df.columns))) ?Rich Murnane

1 Answers

1
votes

If your users table in Snowflake has the following schema then casting is not required, as the StructType fields of a SparkSQL DataFrame will map to the VARIANT type in Snowflake automatically:

CREATE TABLE users (
    login VARIANT
   ,name STRING
   ,notifications VARIANT
   ,query VARIANT
)

Just do the following, no transformations required because the Snowflake Spark Connector understands the data-type and will convert to appropriate JSON representations on its own:

user_df = glueContext.create_dynamic_frame.from_options(
  connection_type="mongodb",
  connection_options=read_mongo_options
)

user_df
  .toDF()
  .write
  .format(SNOWFLAKE_SOURCE_NAME)
  .options(**sfOptions)
  .option("dbtable", "users")
  .mode("append")
  .save()

If you absolutely need to store the StructType fields as plain JSON strings, you'll need to explicitly transform them using the to_json SparkSQL function:

from pyspark.sql.functions import to_json

user_df_cast = user_df.select(
  to_json(user_df.login),
  user_df.name,
  to_json(user_df.notifications)
)

This will store JSON strings as simple VARCHAR types which will not let you leverage Snowflake's semi-structured data storage and querying capabilities directly without a PARSE_JSON step (inefficient).

Consider using the VARIANT approach shown above, which will allow you to perform queries on the fields directly:

SELECT
    login:login_attempts
   ,login:last_attempt
   ,name
   ,notifications:weekly_summary.enabled
FROM users