3
votes

I have data in an S3 bucket containing many json-files that looks somewhat like this:

s3://bucket1/news/year=2018/month=01/day=01/hour=xx/

The day partition contains multiple hour=xx partitions, one for each hour of the day. I run a Glue ETL job on the files in the day partition and create a Glue dynamic_frame_from_options. I then apply some mapping using ApplyMapping.apply which works like a charm.

However, I would then like to create a new column, containing the hour value, based on the partition of each file. I can use Spark to create a new column with a constant, however, how do I make this column to use the partition as a source?

df1 = dynamicFrame.toDF().withColumn("update_date", lit("new column value"))

Edit1

The article from AWS on how to use partitioned data uses a Glue crawler before the creation of a dynamicFrame and then create the dynamicFrame from a Glue catalog. I need to create the dynamicFrame directly from the S3 source. enter link description here

4
Thank you, I have seen the article, however, they are using a Glue crawler before the creation of a dynamicFrame and then create the dynamicFrame from a Glue catalog. I need to create the dynamicFrame directly from the S3 source. - Cactus

4 Answers

3
votes

I am not really following what you need to do. Dont you already have a hour value if you have the files partitoned on it or is that only when you use create_dynamic_frame .from_catalog that you will get it? Can you do a df1["hour"] or df1.select_fields["hour"]?

You do not need to import any libs if you have your data partitoned on ts(timestamp in yyyymmddhh format), this you can perform with pure python in Spark.

Example code. First I create some values that will populate my DataFrame. Then create a new variable like below.

df_values = [('2019010120',1),('2019010121',2),('2019010122',3),('2019010123',4)]
df = spark.createDataFrame(df_values,['yyyymmddhh','some_other_values'])
df_new = df.withColumn("hour", df["yyyymmddhh"][9:10])
df_new.show()
+----------+-----------------+----+
|yyyymmddhh|some_other_values|hour|
+----------+-----------------+----+
|2019010120|                1|  20|
|2019010121|                2|  21|
|2019010122|                3|  22|
|2019010123|                4|  23|
+----------+-----------------+----+
0
votes

I'm not familiar with AWS Glue, if the given link doesn't work for your case, then you can try and see if the following workaround works for you:

Get the file name using input_file_name, then use regexp_extract to get the partition column you want from the file name:

from pyspark.sql.functions import input_file_name, regexp_extract

df2 = df1.withColumn("hour", regexp_extract(input_file_name(), "hour=(.+?)/", 1))
0
votes

As I understand your problem you would like to build dataframe for given day with the hours as partitions. Generally if you use Apache Hive-style partitioned paths and your files have the same schema you shouldn't have a problem to use

ds = glueContext.create_dynamic_frame.from_options(
    's3',
    {'paths': ['s3://bucket1/news/year=2018/month=01/day=01/']},
    'json')

or...

df = spark.read.option("mergeSchema", "true").json('s3://bucket1/news/year=2018/month=01/day=01/')

So if it doesn't work you should check whether you use Apache Hive-style partitioned paths and your files have the same schema.

You can also try to use boto3 framework in Glue (it may be useful to you) :

import boto3
s3 = boto3.resource('s3')

Useful link:

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html

0
votes

"...AWS Glue does not include the partition columns in the DynamicFrame—it only includes the data."

We have to load the S3 key into a new column and decode the partitions programatically to create the columns we want into the Dynamic Frame/Data Frame. Once created, we can use them as we need.

ps: I have test it against parquet files. It doesn't work for JSON files.

Reference