26
votes

Here are some bullet points in terms of how I have things setup:

  • I have CSV files uploaded to S3 and a Glue crawler setup to create the table and schema.
  • I have a Glue job setup that writes the data from the Glue table to our Amazon Redshift database using a JDBC connection. The Job also is in charge of mapping the columns and creating the redshift table.

By re-running a job, I am getting duplicate rows in redshift (as expected). However, is there way to replace or delete rows before inserting the new data, using a key or the partitions setup in glue?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields

from pyspark.sql.functions import lit

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

columnMapping = [
    ("id", "int", "id", "int"),
    ("name", "string", "name", "string"),
]

datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1")
resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1")
dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1")
df1 = dropnullfields1.toDF()
data1 = df1.withColumn('platform', lit('test'))
data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1")

## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")

job.commit()
6
Good question, running into the same issue right now. Did you make any progress thus far?Matthijs
I was in contact with AWS Glue Support and was able to get a work around. It does not appear glue has a way to do this, or was never meant for this type of work. The way I was able to get a working solution was to have glue insert all rows into a staging table and then perform a upsert/merge outside of glue.krchun

6 Answers

18
votes

Job bookmarks are the key. Just edit the job and enable "Job bookmarks" and it won't process already processed data. Note that the job has to rerun once before it will detect it does not have to reprocess the old data again.

For more info see: http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

The name "bookmark" is a bit far fetched in my opinion. I would have never looked at it if I did not coincidentally stumble upon it during my search.

6
votes

This was the solution I got from AWS Glue Support:

As you may know, although you can create primary keys, Redshift doesn't enforce uniqueness. Therefore, if you are rerunning Glue jobs then duplicate rows can get inserted. Some of the ways to maintain uniqueness are:

  1. Use a staging table to insert all rows and then perform a upsert/merge [1] into the main table, this has to be done outside of glue.

  2. Add another column in your redshift table [1], like an insert timestamp, to allow duplicate but to know which one came first or last and then delete the duplicate afterwards if you need to.

  3. Load the previously inserted data into dataframe and then compare the data to be insert to avoid inserting duplicates[3]

[1] - http://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-upsert.html and http://www.silota.com/blog/amazon-redshift-upsert-support-staging-table-replace-rows/

[2] - https://github.com/databricks/spark-redshift/issues/238

[3] - https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html

4
votes

Please check this answer. There is explanation and code sample how to upsert data into Redshift using staging table. The same approach can be used to run any SQL queries before or after Glue writes data using preactions and postactions options:

// Write data to staging table in Redshift
glueContext.getJDBCSink(
  catalogConnection = "redshift-glue-connections-test",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> staging,
    "overwrite" -> "true",
    "preactions" -> "<another SQL queries>",
    "postactions" -> "<some SQL queries>"
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)
2
votes

Today I have tested and got a workaround to update/delete from the target table using JDBC connection.

I have used as below

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

import pg8000
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'PW',
    'HOST',
    'USER',
    'DB'
])
# ...
# Create Spark & Glue context

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# ...
config_port = ****
conn = pg8000.connect(
    database=args['DB'], 
    user=args['USER'], 
    password=args['PW'],
    host=args['HOST'],
    port=config_port
)
query = "UPDATE table .....;"

cur = conn.cursor()
cur.execute(query)
conn.commit()
cur.close()



query1 = "DELETE  AAA FROM  AAA A, BBB B WHERE  A.id = B.id"

cur1 = conn.cursor()
cur1.execute(query1)
conn.commit()
cur1.close()
conn.close()
0
votes

Job bookmarking option in Glue should do the trick , as suggested above . I have been using it successfully when my source is S3. http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

0
votes

As per my testing (with the same scenario), the BOOKMARK functionality is not working. Duplicate data is getting inserted when the Job is run multiple times. I have got this issue resolved by removing the files from the S3 location daily (through lambda) and implementing Staging & Target tables. data will get insert/update based on the matching key columns.