1
votes

I am following the Google Cloud documentation to schedule an Export Job from Datastore in Cloud Functions. https://cloud.google.com/datastore/docs/schedule-export

import base64
import json
import os

from googleapiclient.discovery import build

datastore = build('datastore', 'v1')
project_id = os.environ.get('GCP_PROJECT')


def datastore_export(event, context):
    '''Triggers a Datastore export from a Cloud Scheduler job.

    Args:
        event (dict): event[data] must contain a json object encoded in
            base-64. Cloud Scheduler encodes payloads in base-64 by default.
            Object must include a 'bucket' value and can include 'kinds'
            and 'namespaceIds' values.
        context (google.cloud.functions.Context): The Cloud Functions event
            metadata.
    '''

    json_data = json.loads(base64.b64decode(event['data']).decode('utf-8'))
    bucket = json_data['bucket']
    entity_filter = {}

    if 'kinds' in json_data:
        entity_filter['kinds'] = json_data['kinds']

    if 'namespaceIds' in json_data:
        entity_filter['namespaceIds'] = json_data['namespaceIds']

    request_body = {
        'outputUrlPrefix': bucket,
        'entityFilter': entity_filter
    }

    export_request = datastore.projects().export(
        projectId=project_id,
        body=request_body
    )
    response = export_request.execute()
    print(response)

The response object above contains a field that defines the state of the operation.

  • If operation is still ongoing, it says "PROCESSING"
  • If operation is finished, it says something else (probably "DONE") and also provides the URL where the file has been stored in GCS

Since I am writing a BigQuery import job right after the export, it's crucial for me to execute the BigQuery import AFTER the Datastore Export is finished.

How can I call the API to verify that the job is completed, and get the URL where the Export has been saved?

Thanks.

3

3 Answers

0
votes

It's not the BigQuery job that should be asking when to start but instead the Datastore should be starting it. There are many ways to do this, but I found a tutorial that you might find helpful: how to run your queries as soon as a new Google Analytics table is available

0
votes

There is a way to get this done but keep in mind that cloud functions have a maximum timeout limit of 9 minutes. Thus if the job cannot be done in less that that time it will time out. In my case the whole function takes 2-3 minutes max so there is a lot of time left to consider it safe.

Here is the function that I wrote which checks the status of the operation every SECONDS_BETWEEN_OPERATION_STATUS_CHECKS seconds (I've set it to 5 seconds) and returns whether the operation's been successfull or not:

def wait_until_operation_finished(operation_id):
    """Monitor a operation's progress and wait until it completes.
    
    Args:
        operation_id: Cloud Operation ID.
    Returns:
        True if operation succeeded without errors, False if not.
    """
    operation_in_progress = True
    operation_success = True

    print("Tracking operation {} status".format(operation_id))    
    while operation_in_progress:
        op = datastore.projects().operations().get(name=operation_id)
        res = op.execute()

        operation_status = res['metadata']['common']['state']
        print(operation_status)
        
        if operation_status in {'PROCESSING', 'PENDING', 'RUNNING', 'UNKNOWN'}:
            time.sleep(CONSTANTS.SECONDS_BETWEEN_OPERATION_STATUS_CHECKS)
        elif operation_status == 'SUCCESSFUL':
            operation_in_progress = False
        elif operation_status == 'ERROR':
            operation_in_progress = False
            operation_success = False
            
    print("Finished operation")    
    return operation_success

Then you will simply call this function after you start your operation like so:

response = export_request.execute()

operation_id = response['name']

if wait_until_operation_finished(operation_id):
    print("Export generated successfully.")
    return response
else:
    raise Exception("Failed to generate Datastore export.")
-1
votes

Not directly answering the question, but you may be interested in using Airflow. This is an orchestrator and it fits perfectly to task scheduling like waiting that a task is done before doing another. There are also retry possibilities and dependancies between tasks if needed.
For your use case, the first task would be the Datastore Export and once this task is done successfully, the second task, the BigQuery import, would proceed.