Am new to AWS Glue. I am trying to insert the redshift table thru Glue job, which has S3 crawler to read the csv file and redshift mapped crawler for table scheme.
The below job am trying to run where the create_date from S3 to insert in redshift column in timestamp. The values are always null.
Glue job:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "salesdatabase", table_name = "sales_timestamp_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "salesdatabase", table_name = "sales_timestamp_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("city", "string", "city", "string"), ("country", "string", "country", "string"), ("amount", "string", "amount", "string"), ("create_date", "string", "create_date", "timestamp")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("city", "string", "city", "string"), ("country", "string", "country", "string"), ("amount", "string", "amount", "string"), ("create_date", "string", "create_date", "timestamp")], transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["country", "amount", "city", "create_date"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["country", "amount", "city", "create_date"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "salesdatabase", table_name = "metricsdb_public_sales_csv", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "salesdatabase", table_name = "metricsdb_public_sales_csv", transformation_ctx = "resolvechoice3")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
## @type: DataSink
## @args: [database = "salesdatabase", table_name = "metricsdb_public_sales_csv", redshift_tmp_dir = TempDir, transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "salesdatabase", table_name = "metricsdb_public_sales_csv", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()
Crawler schema details:
S3 schema from crawler
Column name Data type
- city string
- country string
- amount string
- create_date string
Table schema from crawler
Column name Data type
- city string
- country string
- amount string
- create_date timestamp
Any pointer please