0
votes

Am new to AWS Glue. I am trying to insert the redshift table thru Glue job, which has S3 crawler to read the csv file and redshift mapped crawler for table scheme.

The below job am trying to run where the create_date from S3 to insert in redshift column in timestamp. The values are always null.

Glue job:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "salesdatabase", table_name = "sales_timestamp_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "salesdatabase", table_name = "sales_timestamp_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("city", "string", "city", "string"), ("country", "string", "country", "string"), ("amount", "string", "amount", "string"), ("create_date", "string", "create_date", "timestamp")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("city", "string", "city", "string"), ("country", "string", "country", "string"), ("amount", "string", "amount", "string"), ("create_date", "string", "create_date", "timestamp")], transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["country", "amount", "city", "create_date"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["country", "amount", "city", "create_date"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "salesdatabase", table_name = "metricsdb_public_sales_csv", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "salesdatabase", table_name = "metricsdb_public_sales_csv", transformation_ctx = "resolvechoice3")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
## @type: DataSink
## @args: [database = "salesdatabase", table_name = "metricsdb_public_sales_csv", redshift_tmp_dir = TempDir, transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "salesdatabase", table_name = "metricsdb_public_sales_csv", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()

Crawler schema details:

S3 schema from crawler

Column name Data type

  1. city string
  2. country string
  3. amount string
  4. create_date string

Table schema from crawler
Column name Data type

  1. city string
  2. country string
  3. amount string
  4. create_date timestamp

Any pointer please

1
Can you confirm what is the timestamp format and if possible update your question with few sample records and can you try modifying create_date in applymapping to ("create_date", "create_date", "timestamp") ?Prabhakar Reddy
Thanks for the info. The CSV file has the data as city Country amount create_date City1 Country1 $20000 2018/06/15 12:08:00 City2 Country2 $5000 2019/05/18 08:20:00 City3 Country3 $14000 2020/02/20 12:59:00 City4 Country4 $4000 2019/09/18 11:30:00 City5 Country5 $8000 2020/12/31 22:23:00 City6 Country6 $9000 2019/11/13 09:10:00 City7 Country7 $3500 2020/09/12 20:28:00 City8 Country8 $650 2020/06/26 18:45:00saleem javeed
Even I tried with you suggestion as below but there is no success. applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "city", "string"), ("col1", "string", "country", "string"), ("col2", "string", "amount", "string"), ("col3","created_date", "timestamp" )], transformation_ctx = "applymapping1")saleem javeed

1 Answers

0
votes
I could resolve this.

Just convert Dynamic Frame to Spark Data Frame and apply transformation.

from pyspark.sql.functions import to_timestamp, col
from awsglue.dynamicframe import DynamicFrame

#Convert dynamic frame to data frame to use standard pyspark functions
data_frame = datasource0.toDF()
data_frame.show()
data_frame = data_frame.withColumn("created_date",
          to_timestamp(col("created_date"),"dd/MM/yy HH:mm"))
data_frame.show()


#Convert back to the dynamic frame
dynamic_frame_write = DynamicFrame.fromDF(data_frame, glueContext, 'dynamic_frame_write')

Just add this frame to datasink5
It worked for me