I am using Gcloud Composer to launch Dataflow jobs.
My DAG consist of two Dataflow jobs that should be run one after the other.
import datetime
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow import models
default_dag_args = {
'start_date': datetime.datetime(2019, 10, 23),
'dataflow_default_options': {
'project': 'myproject',
'region': 'europe-west1',
'zone': 'europe-west1-c',
'tempLocation': 'gs://somebucket/',
}
}
with models.DAG(
'some_name',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
parameters = {'params': "param1"}
t1 = DataflowTemplateOperator(
task_id='dataflow_example_01',
template='gs://path/to/template/template_001',
parameters=parameters,
dag=dag)
parameters2 = {'params':"param2"}
t2 = DataflowTemplateOperator(
task_id='dataflow_example_02',
template='gs://path/to/templates/template_002',
parameters=parameters2,
dag=dag
)
t1 >> t2
When I check in dataflow the job has succeeded, all the files it is supposed to make are created, but it appears it ran in US region, the cloud composer environment is in Europe west.
In airflow I can see that the first job is still running so the second one is not launched
What should I add to the DAG to make it succeed? How do I run in Europe?
Any advice or solution on how to proceed would be most appreciated. Thanks!
