0
votes

I am fairly new to AWS and I am currently exploring it. I was hoping to get an insight or suggestion on the best way to implement the job.

I wanted to get data from multiple mysql tables.

  • user_transaction
  • user_loans
  • promo_offers

To get the final table I found the following 2 ways.

Method 1:

  1. Create Catalog for each table then,
user_transaction = glueContext.create_dynamic_frame.from_catalog(
           database="Org_Data_Lake",
           table_name="user_transaction"
       transformation_ctx = "user_transaction", 
       additional_options = {"jobBookmarkKeys":["transaction_id"],"jobBookmarksKeysSortOrder":"asc"})

user_loans = glueContext.create_dynamic_frame.from_catalog(
           database="Org_Data_Lake",
           table_name="user_loans")
promo_offers = glueContext.create_dynamic_frame.from_catalog(
           database="Org_Data_Lake",
           table_name="promo_offers")
  1. Then apply Join.apply to get final
final_history = Join.apply(user_transaction,
                       Join.apply(user_loans, promo_offers, 'offer_id', 'offer_id'),
                       'user_loan_id', 'user_loan_id').drop_fields([.......])
  1. Finally, put all data into S3
glueContext.write_dynamic_frame.from_options(frame = final_history,
          connection_type = "s3",
          connection_options = {"path": "s3://glue-sample-target/output-dir/final_history"},
          format = "parquet")

Method 2:

  1. Get the final_history data ready at once,
query = "t1.transaction_id, t1.status, t2.loan_status, t3.offer_amount 
     FROM user_transaction AS t1 
     JOIN user_loans AS t2 ON (t2.user_loan_id = t1.user_loan_id) 
     JOIN promo_offers AS t3 ON (t3.offer_id = t2.offer_id) 
     WHERE t1.created_at > '2020-01-01 00:00:00' LIMIT 10) as tmp"

final_history_data = glueContext.read.format("jdbc")
             .option("driver", jdbc_driver_name)
             .option("url", db_url)
             .option("dbtable", query)
             .option("user", db_username)
             .option("password", db_password).load()

final_history = DynamicFrame.fromDF(final_history_data, glueContext, "final_history")
  1. Finally put all data into S3
glueContext.write_dynamic_frame.from_options(frame = final_history,
          connection_type = "s3",
          connection_options = {"path": "s3://glue-sample-target/output-dir/final_history"},
          format = "parquet")

Which method is the best way and how to apply jobBookmarkKeys to method 2??

1

1 Answers

0
votes

Your job might fail if you are using Method 1 which you need to handle:

  1. Consider there is no new data returned for dynamic_frame user_transaction because you enabled jobbookmark which will fail step 2 as it is empty.

  2. So you need to conditional check that if it is empty then skip the processing and exit.

In Method 2 you cannot leverage jobbookmark as transformation_ctx is missing and cannot be included as you are reading directly from database by pushing down the query down to engine.The transformation_ctx parameter is used to identify state information within a job bookmark for the given operator. Specifically, AWS Glue uses transformation_ctx to index the key to the bookmark state. Refer to this to know more.

Comparing these two methods:

  1. Method 1 will help your jobs to run faster if there is no data to process between job runs as job bookmark is enabled.

  2. Method 2 will give you better performance if your DB has some good hardware which can return the result of the query with less turn around time. But if there is no new data between multiple job runs your job will still send the query to DB engine where as in the later case it will not.

  3. Also considering the number of new records between job runs you can decide to choose either job bookmark or pushing down the query to DB engine.