1
votes

The current set-up:

  • S3 location with json files. All files stored in the same location (no day/month/year structure).

  • Glue Crawler reads the data in a catalog table

  • Glue ETL job transforms and stores the data into parquet tables in s3
  • Glue Crawler reads from s3 parquet tables and stores into a new table that gets queried by Athena

What I want to achieve is the parquet tables to be partitioned by day (1) and the parquet tables for 1 day to be in the same file (2). Currently there is a parquet table for each json file.

How would I go about it?

One thing to mention, there is a datetime column in the data, but it's a unix epoch timestamp. I would probably need to convert that to a 'year/month/day' format, otherwise I'm assuming it will create a partition for each file again.

Thanks a lot for your help!!

3

3 Answers

7
votes

Convert Glue's DynamicFrame into Spark's DataFrame to add year/month/day columns and repartition. Reducing partitions to one will ensure that only one file will be written into a folder but it may slow down job performance.

Here is python code:

from pyspark.sql.functions import col,year,month,dayofmonth,to_date,from_unixtime

...

df = dynamicFrameSrc.toDF()

repartitioned_with_new_columns_df = df
    .withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))
    .withColumn(“year”, year(col(“date_col”)))
    .withColumn(“month”, month(col(“date_col”)))
    .withColumn(“day”, dayofmonth(col(“date_col”)))
    .drop(col(“date_col”))
    .repartition(1)

dyf = DynamicFrame.fromDF(repartitioned_with_new_columns_df, glueContext, "enriched")

datasink = glueContext.write_dynamic_frame.from_options(
    frame = dyf, 
    connection_type = "s3", 
    connection_options = {
        "path": "s3://yourbucket/data”, 
        "partitionKeys": [“year”, “month”, “day”]
    }, 
    format = “parquet”, 
    transformation_ctx = "datasink"
)

Note that the from pyspark.qsl.functions import col can give a reference error, this shouldn't be a prblem as explained here.

3
votes

I cannot comment so I am going to write as an answer.

I used Yuriy's code and a couple of things needed adjustment:

  • missing brackets

df = dynamicFrameSrc.toDF()

  • after toDF() I had to add select("*") otherwise schema was empty

df.select("*") .withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))

1
votes

To achieve this in AWS Glue Studio:

You will need to make a custom function to convert the datetime field to date. There is the extra step of converting it back to a DynamicFrameCollection.

In Python:

def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
    df = dfc.select(list(dfc.keys())[0]).toDF()
    df_with_date = df.withColumn('date_field', df['datetime_field'].cast('date'))
    glue_df = DynamicFrame.fromDF(df_with_date, glueContext, "transform_date")
    return(DynamicFrameCollection({"CustomTransform0": glue_df}, glueContext))

You would then have to edit the custom transformer schema to include that new date field you just created.

You can then use the "data target" node to write the data to disk and then select that new date field to use as a partition.

video step by step walkthrough