I have a file in S3, we are importing it to redshift using Glue. The crawler part is done.
One column the data is datetime type but not properly format, so the clawler not able to identify and marked it as string.
Now I have created the table in redshift and mark the column datatype is timestamp, now while creating the job where and what need to change in the script so the string converted to redshift timestamp.
The format of date in S3 file is 'yyyy.mm.dd HH:mi:ss';
and the script is below.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "", table_name = "", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "", table_name = "", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("mrp", "long", "mrp", "decimal(10,2)"), ("mop", "double", "mop", "decimal(10,2)"), ("mop_update_timestamp", "string", "mop_update_timestamp", "timestamp"), ("special_price", "long", "special_price", "decimal(10,2)"), ("promotion_identifier", "string", "promotion_identifier", "string"), ("is_percentage_promotion", "string", "is_percentage_promotion", "string"), ("promotion_value", "string", "promotion_value", "decimal(10,2)"), ("max_discount", "long", "max_discount", "decimal(10,2)"), ("promotion_start_date", "string", "promotion_start_date", "timestamp"), ("promotion_end_date", "string", "promotion_end_date", "timestamp")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [ ("mrp", "long", "mrp", "decimal(10,2)"), ("mop", "double", "mop", "decimal(10,2)"), ("mop_update_timestamp", "string", "mop_update_timestamp", "timestamp"), ("special_price", "long", "special_price", "decimal(10,2)"), ("promotion_identifier", "string", "promotion_identifier", "string"), ("is_percentage_promotion", "string", "is_percentage_promotion", "string"), ("promotion_value", "string", "promotion_value", "decimal(10,2)"), ("max_discount", "long", "max_discount", "decimal(10,2)"), ("promotion_start_date", "string", "promotion_start_date", "timestamp"), ("promotion_end_date", "string", "promotion_end_date", "timestamp")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "", connection_options = {"dbtable": "", "database": ""}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "", connection_options = {"dbtable": "", "database": ""}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()
("promotion_end_date", "promotion_end_date", "timestamp")in your script for timestamp columns? - Prabhakar Reddy