1
votes

I'm trying to transfer data from the Amazon S3-Cloud to Amazon-Redshift with the Amazon-Data-Pipeline tool.

Is it possible while transferring the Data to change the Data with e.G. an SQL Statement so that just the results of the SQL-Statement will be the input into Redshift?

I only found the Copy Command like:

  {
    "id": "S3Input",
    "type": "S3DataNode",
    "schedule": {
    "ref": "MySchedule"
  },
  "filePath": "s3://example-bucket/source/inputfile.csv"
},

Source: https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-get-started-copy-data-cli.html

3

3 Answers

5
votes

Yes, it is possible. There are two approaches to it:

  1. Use transformSQL of RedShiftCopyActivity

transformSQL is useful if the transformations are performed within the scope of the record that are getting loaded on a timely basis, e.g. every day or hour. That way changes are only applied to the batch and not to the whole table.

Here is an excerpt from the documentation:

transformSql: The SQL SELECT expression used to transform the input data. When you copy data from DynamoDB or Amazon S3, AWS Data Pipeline creates a table called staging and initially loads it in there. Data from this table is used to update the target table. If the transformSql option is specified, a second staging table is created from the specified SQL statement. The data from this second staging table is then updated in the final target table. So transformSql must be run on the table named staging and the output schema of transformSql must match the final target table's schema.

Please, find an example of usage of transformSql below. Notice that select is from staging table. It will effectively run CREATE TEMPORARY TABLE staging2 AS SELECT <...> FROM staging;. Also, all fields must be included and match the existing table in RedShift DB.

{
  "id": "LoadUsersRedshiftCopyActivity",
  "name": "Load Users",
  "insertMode": "OVERWRITE_EXISTING",
  "transformSql": "SELECT u.id, u.email, u.first_name, u.last_name, u.admin, u.guest, CONVERT_TIMEZONE('US/Pacific', cs.created_at_pst) AS created_at_pst, CONVERT_TIMEZONE('US/Pacific', cs.updated_at_pst) AS updated_at_pst FROM staging u;",
  "type": "RedshiftCopyActivity",
  "runsOn": {
    "ref": "OregonEc2Resource"
  },
  "schedule": {
    "ref": "HourlySchedule"
  },
  "input": {
    "ref": "OregonUsersS3DataNode"
  },
  "output": {
    "ref": "OregonUsersDashboardRedshiftDatabase"
  },
  "onSuccess": {
    "ref": "LoadUsersSuccessSnsAlarm"
  },
  "onFail": {
    "ref": "LoadUsersFailureSnsAlarm"
  },
  "dependsOn": {
    "ref": "BewteenRegionsCopyActivity"
  }
}
  1. Use script of SqlActivity

SqlActivity allows operations on the whole dataset, and can be scheduled to run after particular events through dependsOn mechanism

{
  "name": "Add location ID",
  "id": "AddCardpoolLocationSqlActivity",
  "type": "SqlActivity",
  "script": "INSERT INTO locations (id) SELECT 100000 WHERE NOT EXISTS (SELECT * FROM locations WHERE id = 100000);",
  "database": {
    "ref": "DashboardRedshiftDatabase"
  },
  "schedule": {
    "ref": "HourlySchedule"
  },
  "output": {
    "ref": "LocationsDashboardRedshiftDatabase"
  },
  "runsOn": {
    "ref": "OregonEc2Resource"
  },
  "dependsOn": {
    "ref": "LoadLocationsRedshiftCopyActivity"
  }
}
0
votes

There is an optional field in RedshiftCopyActivity called 'transformSql'.

http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-redshiftcopyactivity.html

I have not personally used this, but from the looks of it, it seems like - you will treat your s3 data being in a temp table and this sql stmt will return transformed data for redshift to insert.

So, you will need to list all fields in the select whether or not you are transforming that field.

0
votes

AWS Datapipeline SqlActivity

{
  "id" : "MySqlActivity",
  "type" : "SqlActivity",
  "database" : { "ref": "MyDatabase" },
  "script" : "insert into AnalyticsTable (select (cast(requestEndTime as bigint) - cast(requestBeginTime as bigint)) as requestTime, hostname from StructuredLogs where hostname LIKE '%.domain.sfx');",
  "schedule" : { "ref": "Hour" },
  "queue" : "priority"
}

So basically in "script" any sql script/transformations/commands Amazon Redshift SQL Commands

transformSql is fine but support only The SQL SELECT expression used to transform the input data. ref : RedshiftCopyActivity