I'm currently in the process of automating my data lake ingestion process. I have data coming into my Raw zone (S3 bucket). In the bucket I have 27 folders, each corresponding to a database - each folder has x amount of csv files, each corresponding to a table. I have an S3 event (All object create events) triggering a lambda function which crawls my raw zone. I am able to see every table successfully. Upon completion I'd like to create an ETL job which moves the the data in the processed zone converting it to parquet, however given the amount of tables I have I don't want to manually create a job specifying each table as a "source".
I demoed my automation services by uploading a single csv file to my raw zone and the crawler ran and then the ETL job also ran converting the "s3 raw zone table" to parquet and landing it in to my processed zone. When i dropped my second table, the crawler was able to successfully recognize it as a new table in my raw zone but in my processed zone its merging the data to the first schema (even though they're completely different).
I would expect the following: 1) crawler to recognize the csv as a table 2) glue etl to convert the file to parquet 3) crawler to recognize parquet file(s) as a single table
The following code highlights the problem I was facing - the datasource that was specified is a table (folder) and everything within that folder was assumed to have the same schema.
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "APPLICATION_XYZ", table_name = "RAW_ZONE_w1cqzldd5jpe", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("lpep_pickup_datetime", "string", "lpep_pickup_datetime", "string"), ("lpep_dropoff_datetime", "string", "lpep_dropoff_datetime", "string"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("ratecodeid", "long", "ratecodeid", "long"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("ehail_fee", "string", "ehail_fee", "string"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double"), ("payment_type", "long", "payment_type", "long"), ("trip_type", "long", "trip_type", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]