4
votes

I have created a Glue job that copies data from S3 (csv file) to Redshift. It works and populates the desired table.

However, I need to purge the table during this process as I am left with duplicate records after the process completes.

I'm looking for a way to add this purge to the Glue process. Any advice would be appreciated.

Thanks.

5
Possible duplicate of Overwrite MySQL tables with AWS Gluehoaxz

5 Answers

2
votes

You can alter the Glue script to perform a "preaction" before insertion as explained here:

https://aws.amazon.com/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

For instance, for my script which was mostly based on the defaults I inserted a new DataSink before the last DataSink (I've replaced some of my deatils with {things}):

## @type: DataSink
## @args: [catalog_connection = "redshift-data-live", connection_options = {"dbtable": "{DBTABLE}", "database": "{DBNAME}"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift-data-live", connection_options = {"preactions":"truncate table {TABLENAME};","dbtable": "{SCHEMA.TABLENAME}", "database": "{DB}"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
## @type: DataSink
## @args: [catalog_connection = "redshift-data-live", connection_options = {"dbtable": "{SCHEMA.TABLENAME}", "database": "{DB}"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink5
## @inputs: [frame = datasink4]
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasink4, catalog_connection = "redshift-data-live", connection_options = {"dbtable": "{SCHEMA.TABLENAME}", "database": "{DB}"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()
2
votes

The link @frobinrobin provided is out of date, and I tried many times that the preactions statements will be skiped even you provide a wrong syntax, and came out with duplicated rows(insert action did executed!)

Try this:

just replace the syntax from glueContext.write_dynamic_frame.from_jdbc_conf() in the link above to glueContext.write_dynamic_frame_from_jdbc_conf() will works!

At least this help me out in my case(AWS Glue job just insert data into Redshift without executing Truncate table actions)

0
votes

Did you have a look at Job Bookmarks in Glue? It's a feature for keeping the high water mark and works with s3 only. I am not 100% sure, but it may require partitioning to be in place.

0
votes

You need to modify the auto generated code provided by Glue. Connect to redshift using spark jdbc connection and execute the purge query.

To spin up Glue containers in redshift VPC; specify the connection in glue job, to gain access for redshift cluster.

Hope this helps.

-1
votes

You can use spark/Pyspark databricks library to do an append after a truncate table of the table (this is better performance than an overwrite):

preactions = "TRUNCATE table <schema.table>" 
df.write\
  .format("com.databricks.spark.redshift")\
  .option("url", redshift_url)\
  .option("dbtable", redshift_table)\
  .option("user", user)\
  .option("password", readshift_password)\
  .option("aws_iam_role", redshift_copy_role)\
  .option("tempdir", args["TempDir"])\
  .option("preactions", preactions)\
  .mode("append")\
  .save()

You can take a look at databricks documentation in here