2
votes

Basically i am trying to transfer data from postgres to redshift using aws datapipeline and the process i am following

  1. Write a pipeline(CopyActivity) that moves data from postgres to s3
  2. Write a pipeline(RedShiftCopyActivity) that moves data from s3 to redshift

So in my case both are working perfectly with the pipelines i wrote, but the problem is the data was duplicating in the Redshift database

For example below is the data from postgres database in a table called company

enter image description here

After the successful run of s3 to redshift(RedShiftCopyActivity) pipeline the data was copied but it was duplicated as below

enter image description here

below is the some of the definition part from RedShiftCopyActivity(S3 to Redshift) pipeline

  pipeline_definition = [{
      "id":"redshift_database_instance_output",
      "name":"redshift_database_instance_output",
      "fields":[
          {
          "key" : "database",
          "refValue" : "RedshiftDatabaseId_S34X5",
          },
          {
          "key" : "primaryKeys",
          "stringValue" : "id",
          },
          {
          "key" : "type",
          "stringValue" : "RedshiftDataNode",
          },
          {
          "key" : "tableName",
          "stringValue" : "company",
          },
          {
          "key" : "schedule",
          "refValue" : "DefaultScheduleTime",
          },
          {
          "key" : "schemaName",
          "stringValue" : RedShiftSchemaName,
          },
      ]
  },
  {
      "id":"CopyS3ToRedshift",
      "name":"CopyS3ToRedshift",
      "fields":[
          {
          "key" : "output",
          "refValue" : "redshift_database_instance_output",
          },
          {
          "key" : "input",
          "refValue" : "s3_input_data",
          },
          {
          "key" : "runsOn",
          "refValue" : "ResourceId_z9RNH",
          },
          {
          "key" : "type",
          "stringValue" : "RedshiftCopyActivity",
          },
          {
          "key" : "insertMode",
          "stringValue" : "KEEP_EXISTING",
          },
          {
          "key" : "schedule",
          "refValue" : "DefaultScheduleTime",
          },
      ]
  },]

So according to the docs of RedShitCopyActivity we need to use insertMode to describe how the data should behave(inserted/updated/deleted) when copying to database table as below

insertMode : Determines what AWS Data Pipeline does with pre-existing data in the target table that overlaps with rows in the data to be loaded. Valid values are KEEP_EXISTING, OVERWRITE_EXISTING, TRUNCATE and APPEND. KEEP_EXISTING adds new rows to the table, while leaving any existing rows unmodified. KEEP_EXISTING and OVERWRITE_EXISTING use the primary key, sort, and distribution keys to identify which incoming rows to match with existing rows, according to the information provided in Updating and inserting new data in the Amazon Redshift Database Developer Guide. TRUNCATE deletes all the data in the destination table before writing the new data. APPEND will add all records to the end of the Redshift table. APPEND does not require a primary, distribution key, or sort key so items that may be potential duplicates may be appended.

So what my requirements are

  1. When copying from postgres (infact data is in s3 now) to Redshift database if it found already existing rows then just update it
  2. If it founds new records from s3 then create new records in Redshift

But for me even though i have used KEEP_EXISTING or OVERWRITE_EXISTING, the data was just repeating over and over again as shown in the above redshift database picture

So finally how to achieve my requirements ? are there still any tweaks or settings to add to my configuration ?

Edit

Table(company) definition from redshift

enter image description here

1
Can you provide your table definition in redshift?Maxime
@Maxime I have edited my question with table definition, please check above.Shiva Krishna Bavandla
Since you have a primary key defined, try getting rid of "key":"primaryKeys","stringValue" : "id". Also, what are the permissions of the user running the copy activity? Can it create temporary tables?Maxime
@Maxime Ok so do you want me to remove "key":"primaryKeys","stringValue" : "id". from the pipeline definition ? and yeah it can create the temporary tables too, so can i know what needs to be done now ?Shiva Krishna Bavandla
Well I don't know for sure what needs to be done, I'm just suggesting what you can try, as it's always been working for me and I never faced any issue. Removing the primaryKeys from the pipeline definition might help.Maxime

1 Answers

1
votes

If you want to avoid duplication , you must define Primary key in redshift and also set myInsertMode as "OVERWRITE_EXISTING" .