0
votes

I'm currently in the process of automating my data lake ingestion process. I have data coming into my Raw zone (S3 bucket). In the bucket I have 27 folders, each corresponding to a database - each folder has x amount of csv files, each corresponding to a table. I have an S3 event (All object create events) triggering a lambda function which crawls my raw zone. I am able to see every table successfully. Upon completion I'd like to create an ETL job which moves the the data in the processed zone converting it to parquet, however given the amount of tables I have I don't want to manually create a job specifying each table as a "source".

I demoed my automation services by uploading a single csv file to my raw zone and the crawler ran and then the ETL job also ran converting the "s3 raw zone table" to parquet and landing it in to my processed zone. When i dropped my second table, the crawler was able to successfully recognize it as a new table in my raw zone but in my processed zone its merging the data to the first schema (even though they're completely different).

I would expect the following: 1) crawler to recognize the csv as a table 2) glue etl to convert the file to parquet 3) crawler to recognize parquet file(s) as a single table

The following code highlights the problem I was facing - the datasource that was specified is a table (folder) and everything within that folder was assumed to have the same schema.

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "APPLICATION_XYZ", table_name = "RAW_ZONE_w1cqzldd5jpe", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("lpep_pickup_datetime", "string", "lpep_pickup_datetime", "string"), ("lpep_dropoff_datetime", "string", "lpep_dropoff_datetime", "string"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("ratecodeid", "long", "ratecodeid", "long"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("ehail_fee", "string", "ehail_fee", "string"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double"), ("payment_type", "long", "payment_type", "long"), ("trip_type", "long", "trip_type", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
1
why not create an external table in athena, in case you already know the schema, instead of creating a crawler. This will be cost effective as well, as athena doesn't charge for DDL queries. - Harsh Bafna
the problem is we don't have the schema information for all tables - midiman
Could you share the sample code of how you are dumping the processed data back to S3. - Harsh Bafna
@HarshBafna upon reviewing the code i think i see the problem. My job was referencing a specific table (parent folder) as a datasource: I'll update my question with the code. - midiman
That's great to hear :-). Let me know if you need any more help. :-) - Harsh Bafna

1 Answers

0
votes

Created an ETL job with the following function to loop through the tables in my database and write a parquet file to a new folder with the same name (so I can crawl the table and use athena to query).

databaseName = 'DATABASE'
Tables = client.get_tables( DatabaseName = databaseName )
tableList = Tables ['TableList']
for table in tableList:
    tableName = table['Name']
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "DATABASE", table_name = tableName, transformation_ctx = "datasource0")
    datasink4 = glueContext.write_dynamic_frame.from_options(frame = datasource0, connection_type = "s3", connection_options = {"path": "s3://processed-45ah4xoyqr1b/Application1/"+tableName+"/"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()