0
votes

I'm brand new to AWS Glue and want to create a job that will take a SQL script I've written (an INSERT INTO statement) and populate an empty table I have in Redshift. Is this possible? If so, what is the syntax?

I've started with a test case. Copy data from one table in my Redshift to another.

This is the script proposed by AWS. I selected the "Change Schema" option because I wanted to create a new target dataset.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "dev", table_name = "patients", redshift_tmp_dir = TempDir, transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "dev", table_name = "patients", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("birthdate", "string", "date of birth", "string"), ("_id", "string", "patient id", "string"), ("name_middle", "string", "patient middle name", "string"), ("gender", "string", "gender", "string"), ("name_family", "string", "patient last name", "string"), ("name_given", "string", "patient first name", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("birthdate", "string", "date of birth", "string"), ("_id", "string", "patient id", "string"), ("name_middle", "string", "patient middle name", "string"), ("gender", "string", "gender", "string"), ("name_family", "string", "patient last name", "string"), ("name_given", "string", "patient first name", "string")], transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["gender", "patient middle name", "patient last name", "patient first name", "patient id", "date of birth"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["gender", "patient middle name", "patient last name", "patient first name", "patient id", "date of birth"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "dev", table_name = "patients_info", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "dev", table_name = "patients_info", transformation_ctx = "resolvechoice3")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
## @type: DataSink
## @args: [database = "dev", table_name = "patients_info", redshift_tmp_dir = TempDir, transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "dev", table_name = "patients_info", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()

And then I tried a simple use case that still failed:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

persons = glueContext.create_dynamic_frame.from_catalog(
    database = "dev", 
    table_name = "patients", 
    redshift_tmp_dir = args["TempDir"], 
    additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"}) 

print "Count: ", persons.count()
persons.printSchema()
1
Can you post the query that you want to run and your use case is just to insert some values into a table which already present in redshift? - Prabhakar Reddy
Could you provide more information about the table columns? Or a sample of the Insert statement? - TomTom
I added code @PrabhakarReddy - adajo
I provided more information @TomTom - adajo

1 Answers

0
votes

You should not be considering "insert into" as a way to write data to redshift, it is very slow.

The correct process is:

  1. Write data to s3
  2. Copy data from s3 into redshift using redshift COPY command

Fortunately, AWS Glue simplifies this for you, you can write to redshift like this

glueContext.write_dynamic_frame.from_catalog(
    database = "database-name", 
    table_name = "table-name", 
    redshift_tmp_dir = args["TempDir"], 
    additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"}) 

You should read this AWS doc carefully. https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-redshift.html