I have the script below to move all columns in tables of varying sizes, 90 million to 250 million records deep from an on premises Oracle database to AWS Redshift. The script also appends several audit columns given:
add_metadata1 = custom_spark_df.withColumn('line_number', F.row_number().over(Window.orderBy(lit(1))))
add_metadata2 = add_metadata1.withColumn('source_system', lit(source_system))
add_metadata3 = add_metadata2.withColumn('input_filename', lit(input_filename))
add_metadata4 = add_metadata3.withColumn('received_timestamp', lit(received_timestamp))
add_metadata5 = add_metadata4.withColumn('received_timestamp_unix', lit(received_timestamp_unix))
add_metadata6 = add_metadata5.withColumn('eff_data_date', lit(eff_data_date))
At present, the long running nature of the job causes a connection timeout after 3-5 hours and therefore has never finished:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## Start - Custom block of imports ##
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import datetime
from pyspark.sql.functions import lit
## End - Custom block of imports ##
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "metadatastore", table_name = "TableName", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("...MAPPINGS OUTLINED...")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## Start - Custom block for creation of metadata columns ##
now = datetime.datetime.now()
##line_number = '1'
## Remember to update source_system (if needed) and input_filename
source_system = 'EDW'
input_filename = 'TableName'
received_timestamp = datetime.datetime.strptime(now.strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
received_timestamp_unix = int((now - datetime.datetime(1970,1,1)).total_seconds())
eff_data_date = datetime.datetime.strptime(now.strftime("%Y-%m-%d"), "%Y-%m-%d").date()
## Update to the last dataframe used
## Do not forget to update write_dynamic_frame to use custom_dynamic_frame for the frame name and add schema to the dbtable name
custom_spark_df = dropnullfields3.toDF()
add_metadata1 = custom_spark_df.withColumn('line_number', F.row_number().over(Window.orderBy(lit(1))))
add_metadata2 = add_metadata1.withColumn('source_system', lit(source_system))
add_metadata3 = add_metadata2.withColumn('input_filename', lit(input_filename))
add_metadata4 = add_metadata3.withColumn('received_timestamp', lit(received_timestamp))
add_metadata5 = add_metadata4.withColumn('received_timestamp_unix', lit(received_timestamp_unix))
add_metadata6 = add_metadata5.withColumn('eff_data_date', lit(eff_data_date))
custom_dynamic_frame = DynamicFrame.fromDF(add_metadata6, glueContext, "add_metadata6")
## End - Custom block for creation of metadata columns ##
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = custom_dynamic_frame, catalog_connection = "Redshift", connection_options = {"dbtable": "schema_name.TableName", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()
How can this script be improved to reduce run time and permit complete execution?