4
votes

Currently these are the options to schedule the execution of a Dataflow's job that I know:

  • Using App Engine Cron Service or Cloud Functions.

    • This example is with Java, There are any official example with Python as simple?
    • This example is with Python but I'm not sure if currently is still a good option or is "deprecated"
  • From a cron job in a Compute Engine

    • Any tutorial of this?
  • Using windowing in a streaming pipeline

    • I think this is the easiest but, is the best thinking in total cost?
  • Cloud Scheduler

    • Is this a valid method?
2
I found this example, but i would love to know how to write the cloud function in pythonWIT
From my understanding, the most "up to date" method is using airflow because in the case your job fails, airflow can pick up where it left offWIT

2 Answers

1
votes

I use App Engine Flex as my Dataflow launcher. This microservice has endpoints to launch dataflow jobs on demand, which cron can hit too.

This is my project structure:

df_tasks/
- __init__.py
- datastore_to_csv.py
- ...other_piplines
__init__.py
dflaunch.yaml
main.py
setup.py <-- used by pipelines

The trick with this for me was getting my pipeline dependencies set up correctly. Namely, using a setup.py for pipeline dependencies. Setting it up like this example helped out the most: https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/juliaset

setup.py:

import setuptools

setuptools.setup(
    name='dataflow_python_pipeline',
    version='1.0.0',
    description='DataFlow Python Pipeline',
    packages=setuptools.find_packages(),
)

My pipline configs in df_tasks then looks like this:

pipeline_options = PipelineOptions.from_dictionary({
        'project': project,
        'runner': 'DataflowRunner',
        'staging_location': bucket_path+'/staging',
        'temp_location': bucket_path+'/temp',
        'setup_file': './setup.py'
    })

Then in main.py:

from df_tasks import datastore_to_csv

project_id = os.environ['GCLOUD_PROJECT']

@app.route('/datastore-to-csv', methods=['POST'])
def df_day_summary():
    # Extract Payload
        payload = request.get_json()
        model = payload['model']
        for_date = datetime.datetime.strptime(payload['for_date'], '%Y/%m/%d')
    except Exception as e:
        print traceback.format_exc()
        return traceback.format_exc()
    # launch the job
    try:
        job_id, job_name = datastore_to_csv.run(
            project=project_id,
            model=model,
            for_date=for_date,
        )
        # return the job id
        return jsonify({'jobId': job_id, 'jobName': job_name})
    except Exception as e:
        print traceback.format_exc()
        return traceback.format_exc()
0
votes

There are multiple ways, but one that I think would be very convenient for you would be using the DataflowPythonOperator of Apache Airflow.

GCP offers a managed service for Apache Airflow in the form of Cloud Composer, which you can use to schedule Dataflow pipelines, or other GCP operations.