13
votes

I have an S3 bucket which is constantly being filled with new data, I am using Athena and Glue to query that data, the thing is if glue doesn't know that a new partition is created it doesn't search that it needs to search there. If I make an API call to run the Glue crawler each time I need a new partition is too expensive so the best solution to do this is to tell glue that a new partition is added i.e to create a new partition is in it's properties table. I looked through AWS documentation but no luck, I am using Java with AWS. Any help?

4
Too expensive in terms of computation or money?botchniaque
Money wise, it's not a difficult operation to use CPU that much.Gudzo
then if you know when new partitions are added try the #3 option from my answer.botchniaque
@Gudzo - Can you accept my answer if it helped?conetfun
@Gudzo Hello? Checking in if you can accept my solutionconetfun

4 Answers

6
votes
  1. You can configure you're glue catalog to get triggered every 5 mins
  2. You can create a lambda function which will either run on schedule, or will be triggered by an event from your bucket (eg. putObject event) and that function could call athena to discover partitions:

    import boto3
    
    athena = boto3.client('athena')
    
    def lambda_handler(event, context):
        athena.start_query_execution(
            QueryString = "MSCK REPAIR TABLE mytable",
            ResultConfiguration = {
                'OutputLocation': "s3://some-bucket/_athena_results"
            }
    
  3. Use Athena to add partitions manualy. You can also run sql queries via API like in my lambda example.

    Example from Athena manual:

    ALTER TABLE orders ADD
      PARTITION (dt = '2016-05-14', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_14_May_2016'
      PARTITION (dt = '2016-05-15', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_15_May_2016';
    
29
votes

You may want to use batch_create_partition() glue api to register new partitions. It doesn't require any expensive operation like MSCK REPAIR TABLE or re-crawling.

I had a similar use case for which I wrote a python script which does the below -

Step 1 - Fetch the table information and parse the necessary information from it which is required to register the partitions.

# Fetching table information from glue catalog
logger.info("Fetching table info for {}.{}".format(l_database, l_table))
try:
    response = l_client.get_table(
        CatalogId=l_catalog_id,
        DatabaseName=l_database,
        Name=l_table
    )
except Exception as error:
    logger.error("Exception while fetching table info for {}.{} - {}"
                 .format(l_database, l_table, error))
    sys.exit(-1)

# Parsing table info required to create partitions from table
input_format = response['Table']['StorageDescriptor']['InputFormat']
output_format = response['Table']['StorageDescriptor']['OutputFormat']
table_location = response['Table']['StorageDescriptor']['Location']
serde_info = response['Table']['StorageDescriptor']['SerdeInfo']
partition_keys = response['Table']['PartitionKeys']

Step 2 - Generate a dictionary of lists where each list contains the information to create a single partition. All lists will have same structure but their partition specific values will change (year, month, day, hour)

def generate_partition_input_list(start_date, num_of_days, table_location,
                                  input_format, output_format, serde_info):
    input_list = []  # Initializing empty list
    today = datetime.utcnow().date()
    if start_date > today:  # To handle scenarios if any future partitions are created manually
        start_date = today
    end_date = today + timedelta(days=num_of_days)  # Getting end date till which partitions needs to be created
    logger.info("Partitions to be created from {} to {}".format(start_date, end_date))

    for input_date in date_range(start_date, end_date):
        # Formatting partition values by padding required zeroes and converting into string
        year = str(input_date)[0:4].zfill(4)
        month = str(input_date)[5:7].zfill(2)
        day = str(input_date)[8:10].zfill(2)
        for hour in range(24):  # Looping over 24 hours to generate partition input for 24 hours for a day
            hour = str('{:02d}'.format(hour))  # Padding zero to make sure that hour is in two digits
            part_location = "{}{}/{}/{}/{}/".format(table_location, year, month, day, hour)
            input_dict = {
                'Values': [
                    year, month, day, hour
                ],
                'StorageDescriptor': {
                    'Location': part_location,
                    'InputFormat': input_format,
                    'OutputFormat': output_format,
                    'SerdeInfo': serde_info
                }
            }
            input_list.append(input_dict.copy())
    return input_list

Step 3 - Call the batch_create_partition() API

for each_input in break_list_into_chunks(partition_input_list, 100):
    create_partition_response = client.batch_create_partition(
        CatalogId=catalog_id,
        DatabaseName=l_database,
        TableName=l_table,
        PartitionInputList=each_input
    )

There is a limit of 100 partitions in a single api call, So if you are creating more than 100 partitions then you will need to break your list into chunks and iterate over it.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.batch_create_partition

4
votes

This question is old but I wanted to put it out there that someone could have s3:ObjectCreated:Put notifications trigger a Lambda function which registers new partitions when data arrives on S3. I would even expand this function to handle deprecations based on object deletes and so on. Here's a blog post by AWS which details S3 event notifications: https://aws.amazon.com/blogs/aws/s3-event-notification/

2
votes

AWS Glue recently added a RecrawlPolicy that only crawls the new folders/paritions that you add to your S3 bucket.

https://docs.aws.amazon.com/glue/latest/dg/incremental-crawls.html

This should help you with minimizing crawling all the data again an again. From what I read, you can define incremental crawls while setting up your crawler, or editing an existing one. One thing however to note is that incremental crawls require the schema of new data to be more or less the same as existing schema.