2
votes

I have a file in S3, we are importing it to redshift using Glue. The crawler part is done.

One column the data is datetime type but not properly format, so the clawler not able to identify and marked it as string.

Now I have created the table in redshift and mark the column datatype is timestamp, now while creating the job where and what need to change in the script so the string converted to redshift timestamp.

The format of date in S3 file is 'yyyy.mm.dd HH:mi:ss';

and the script is below.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "", table_name = "", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "", table_name = "", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("mrp", "long", "mrp", "decimal(10,2)"), ("mop", "double", "mop", "decimal(10,2)"), ("mop_update_timestamp", "string", "mop_update_timestamp", "timestamp"), ("special_price", "long", "special_price", "decimal(10,2)"), ("promotion_identifier", "string", "promotion_identifier", "string"), ("is_percentage_promotion", "string", "is_percentage_promotion", "string"), ("promotion_value", "string", "promotion_value", "decimal(10,2)"), ("max_discount", "long", "max_discount", "decimal(10,2)"), ("promotion_start_date", "string", "promotion_start_date", "timestamp"), ("promotion_end_date", "string", "promotion_end_date", "timestamp")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [ ("mrp", "long", "mrp", "decimal(10,2)"), ("mop", "double", "mop", "decimal(10,2)"), ("mop_update_timestamp", "string", "mop_update_timestamp", "timestamp"), ("special_price", "long", "special_price", "decimal(10,2)"), ("promotion_identifier", "string", "promotion_identifier", "string"), ("is_percentage_promotion", "string", "is_percentage_promotion", "string"), ("promotion_value", "string", "promotion_value", "decimal(10,2)"), ("max_discount", "long", "max_discount", "decimal(10,2)"), ("promotion_start_date", "string", "promotion_start_date", "timestamp"), ("promotion_end_date", "string", "promotion_end_date", "timestamp")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "", connection_options = {"dbtable": "", "database": ""}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "", connection_options = {"dbtable": "", "database": ""}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()
1
Can you try ("promotion_end_date", "promotion_end_date", "timestamp") in your script for timestamp columns? - Prabhakar Reddy
Hi, I will try this, but how the program know what is the timestamp input format, somewhere I have to give the format also. - Deepesh Uniyal
It should automatically takes care of that.Let me know how it goes - Prabhakar Reddy
Hi, Not working the column value in the database is blank. - Deepesh Uniyal
Have you tried stackoverflow.com/questions/54176546/… to convert to timestamp format before writting to redshift? - Sandeep Fatangare

1 Answers

0
votes

Have you tried to make it into a dataframe and then cast to timestamp since you have it in 'yyyy.mm.dd HH:mi:ss' format? Something like this:

## Add this in order to use DynamicFrame.fromDF
from awsglue.dynamicframe import DynamicFrame

## Make a dataframe 
df_datasource0 = datasource0.toDF()

## add a column mop_update_timestamp_ts where you cast mop_update_timestamp to a timestamp
df_datasource0 = df_datasource0.withColumn('mop_update_timestamp_ts',df_datasource0.mop_update_timestamp.cast('timestamp'))

## Transform the dataframe back to a dynamic frame again 
datasource0 = DynamicFrame.fromDF(df_datasource0, glueContext, "datasource0") 

## Use the mop_update_timestamp_ts column instead as below. 
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [ ("mrp", "long", "mrp", "decimal(10,2)"), ("mop", "double", "mop", "decimal(10,2)"), ("mop_update_timestamp_ts", "timestamp", "mop_update_timestamp", "timestamp"), ("special_price", "long", "special_price", "decimal(10,2)"), ("promotion_identifier", "string", "promotion_identifier", "string"), ("is_percentage_promotion", "string", "is_percentage_promotion", "string"), ("promotion_value", "string", "promotion_value", "decimal(10,2)"), ("max_discount", "long", "max_discount", "decimal(10,2)"), ("promotion_start_date", "string", "promotion_start_date", "timestamp"), ("promotion_end_date", "string", "promotion_end_date", "timestamp")], transformation_ctx = "applymapping1")

Let me know if it works for you