2
votes

I am trying to execute a dataflow python file that reads a text file from a GCS bucket through an airflow DAG using its DataFlowPythonOperator. I have been able to execute the python file independently but it fails when I execute it through airflow. I am using a service account to authenticate for my default gcp connection. The error I get when executing the job is:

{gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete.
{models.py:1417} ERROR - DataFlow failed with return code 2
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/dataflow_operator.py", line 182, in execute
    self.py_file, self.py_options)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 152, in start_python_dataflow
    task_id, variables, dataflow, name, ["python"] + py_options)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow
    _Dataflow(cmd).wait_for_done()
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done
    self._proc.returncode))
Exception: DataFlow failed with return code 2

My airflow script:

from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from datetime import datetime, timedelta 

# Default DAG parameters
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': <email>,
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': datetime(2018, 4, 30),
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'dataflow_default_options': {
        'project': '<Project ID>'        
    }
}

dag = DAG(
    dag_id='df_dag_readfromgcs', 
    default_args=default_args, 
    schedule_interval=timedelta(minutes=60)
    )

task1 = DataFlowPythonOperator(
    task_id='task1',    
    py_file='~<path>/1readfromgcs.py',
    gcp_conn_id='default_google_cloud_connection',
    dag=dag
)

My Dataflow python file (1readfromgcs.py) contains the following code:

from __future__ import absolute_import

import argparse
import logging
import apache_beam as beam
import apache_beam.pipeline as pipeline
import apache_beam.io as beamio
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText


def runCode(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        default='<Input file path>',
                        help='File name')   
    known_args, pipeline_args = parser.parse_known_args(argv) 

    pipeline_args.extend([
        '--project=<project name>',
        '--runner=DataflowRunner',
        '--job_name=<job name>',
        '--region=europe-west1',
        '--staging_location=<GCS staging location>',
        '--temp_location=<GCS temp location>'
    ])

    pipeline_options = PipelineOptions(pipeline_args)
    p = beam.pipeline.Pipeline(options=pipeline_options)

    rows = p | 'read' >> beam.io.ReadFromText(known_args.input) 

    p.run().wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    runCode()

I am unable to debug and figure out the reason for this exception and as per my investigation in Airflow: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/gcp_dataflow_hook.py file, the error is arising from the following lines:

def wait_for_done(self):
        reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
        self.log.info("Start waiting for DataFlow process to complete.")
        while self._proc.poll() is None:
            ret = select.select(reads, [], [], 5)
            if ret is not None:
                for fd in ret[0]:
                    line = self._line(fd)
                    self.log.debug(line[:-1])
            else:
                self.log.info("Waiting for DataFlow process to complete.")
        if self._proc.returncode is not 0:
            raise Exception("DataFlow failed with return code {}".format(
                self._proc.returncode))

Appreciate your thoughts and help with my issue.

1
I am not really familiar with Airflow, but I see nothing strange that would make the Dataflow job fail. In any case, given that the pipeline works properly if you execute it in Dataflow directly (without going through Airflow), it looks like the error is in the Airflow side indeed. If the applications ends in the method wait_for_done(), I understand that the job is run in your GCP project, but it fails during the execution, so you should be able to find more details in the Dataflow UI and the logs. Please, see if there is any relevant info to add from there. - dsesto

1 Answers

0
votes

This exception stems from _proc which is a subprocess. It returns an exit code from a shell.

I haven't worked with this component yet. Depending on what is being executed this exit code 2 will tell about the reason of the exit. E.g. this exit code in bash means:

Misuse of shell builtins

and could be connected to

Missing keyword or command, or permission problem

So it might be connected to the underlying DataFlow configuration. Try manually executing the file while impersonating the user airflow.