I am running an Apache Beam pipeline (deployed with Google Dataflow) which is being orchestrated with Apache Airflow.
The DAG file looks like the following:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator
import custom_py_file #beam job in this file
default_args = {
'owner': 'name',
'depends_on_past': False,
'start_date': datetime(2016, 1, 1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1),
}
CONNECTION_ID = 'proj'
with DAG('dag_pipeline', schedule_interval='@once', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:
lines = PythonVirtualenvOperator(
task_id='lines',
python_callable=custom_py_file.main, #this file has a function main() where the beam job is declared
requirements=['apache-beam[gcp]', 'pandas'],
python_version=3,
dag=dag
)
lines
The beam pipeline file (custom_py_file.py
) is as follows:
def main():
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import argparse
import time
class ETL(beam.DoFn):
def process(self, row):
#process data
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://bucket/input/input.txt',
help='Input file to process.'
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=proj',
'--region=region',
'--staging_location=gs://bucket/staging/',
'--temp_location=gs://bucket/temp/',
'--job_name=name-{}'.format(time.strftime("%Y%m%d%h%M%s").lower()),
'--setup_file=/home/airflow/gcs/dags/setup.py',
'--disk_size_gb=350',
'--machine_type=n1-highmem-96',
'--num_workers=24',
'--autoscaling_algorithm=NONE'
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p.run().wait_until_finish()
logging.getLogger().setLevel(logging.DEBUG)
run()
I am using a PythonVirtualenvOperator
because I cannot use Python3 and a BashOperator
with my current version of airflow (Version : 1.10.2-composer), and I need Python3 to run this pipeline.
The problem is that despite a successful run, Airflow submits another Dataflow Job. Note that this is NOT a retry, as the logs show that it is all "one" task run. However the Dataflow logs show it running the exact same job again after it has already run successful once.
What is going on here? Is the successful dataflow job not outputting a 0 value? How do I get it to move on to the next task if it ran correctly? Thanks!