1
votes

I am Calling DataFlow jobs from Airflow in Google Cloud Composer,

a >> b >> c

a,b and c are tasks which are calling dataflow jobs. I want to run b only after a dataflow job completes, problem is that they all are running simultaneously.

How can I wait till previous job completes?

1
You might want to check this article where ExternalTaskSensor is implemented to determine whether the previous DAG is done.Ricco D

1 Answers

1
votes

After placing a job you need to place a sensor that verify the job is completed.

Example:

   start_python_job_async = DataflowCreatePythonJobOperator(
        task_id="start-python-job-async",
        py_file=GCS_PYTHON,
        py_options=[],
        job_name='{{task.task_id}}',
        options={
            'output': GCS_OUTPUT,
        },
        py_requirements=['apache-beam[gcp]==2.25.0'],
        py_interpreter='python3',
        py_system_site_packages=False,
        location='europe-west3',
        wait_until_finished=False,
    )

    wait_for_python_job_async_done = DataflowJobStatusSensor(
        task_id="wait-for-python-job-async-done",
        job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
        expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
        location='europe-west3',
    )

    start_python_job_async >> wait_for_python_job_async_done

You can view the docs and the examples that explains further