1
votes

I am running an Apache Beam pipeline (deployed with Google Dataflow) which is being orchestrated with Apache Airflow.

The DAG file looks like the following:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator

import custom_py_file #beam job in this file 


default_args = {
    'owner': 'name',
    'depends_on_past': False,
    'start_date': datetime(2016, 1, 1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=1),
}

CONNECTION_ID = 'proj'

with DAG('dag_pipeline', schedule_interval='@once', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:


    lines = PythonVirtualenvOperator(
        task_id='lines',
        python_callable=custom_py_file.main, #this file has a function main() where the beam job is declared 
        requirements=['apache-beam[gcp]', 'pandas'],
        python_version=3,
        dag=dag
    )

lines

The beam pipeline file (custom_py_file.py) is as follows:

def main():
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    import argparse
    import time


    class ETL(beam.DoFn):
        def process(self, row):
            #process data 

    def run(argv=None):
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--input',
            dest='input',
            default='gs://bucket/input/input.txt',
            help='Input file to process.'
            )
        known_args, pipeline_args = parser.parse_known_args(argv)
        pipeline_args.extend([
              '--runner=DataflowRunner',
              '--project=proj',
              '--region=region',
              '--staging_location=gs://bucket/staging/',
              '--temp_location=gs://bucket/temp/',
              '--job_name=name-{}'.format(time.strftime("%Y%m%d%h%M%s").lower()),
              '--setup_file=/home/airflow/gcs/dags/setup.py',
              '--disk_size_gb=350',
              '--machine_type=n1-highmem-96',
              '--num_workers=24',
              '--autoscaling_algorithm=NONE'
              ]) 

        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(SetupOptions).save_main_session = True

        with beam.Pipeline(options=pipeline_options) as p:
            rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
            etl = (rows | 'process data' >> beam.ParDo(ETL()))

        p.run().wait_until_finish()

    logging.getLogger().setLevel(logging.DEBUG)
    run()

I am using a PythonVirtualenvOperator because I cannot use Python3 and a BashOperator with my current version of airflow (Version : 1.10.2-composer), and I need Python3 to run this pipeline.

The problem is that despite a successful run, Airflow submits another Dataflow Job. Note that this is NOT a retry, as the logs show that it is all "one" task run. However the Dataflow logs show it running the exact same job again after it has already run successful once.

dataflow logs

What is going on here? Is the successful dataflow job not outputting a 0 value? How do I get it to move on to the next task if it ran correctly? Thanks!

1
How do you run the pipeline in your python code? Can something similar to this be happening?Guillem Xercavins
I updated the question to include my beam file @GuillemXercavinsmanesioz
Yes I think that might be it, I will change it and try it again, thanks!manesioz

1 Answers

3
votes

The fact that it's not considered a retry and one job executes after the first one ends made me suspect of something similar to this. Checking your Python code I see that you call both with beam.Pipeline() and p.run():

with beam.Pipeline(options=pipeline_options) as p:
    rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
    etl = (rows | 'process data' >> beam.ParDo(ETL()))

p.run().wait_until_finish()

This will trigger two consecutive executions. You can do either option (but not both):

with beam.Pipeline(options=pipeline_options) as p:
    rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
    etl = (rows | 'process data' >> beam.ParDo(ETL()))

p = beam.Pipeline(options=pipeline_options)

rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))

p.run().wait_until_finish()