0
votes

I'm having a few problems running a relatively vanilla Dataflow job from an AI Platform Notebook (the job is meant to take data from BigQuery > cleanse and prep > write to a CSV in GCS):

options = {'staging_location': '/staging/location/',
           'temp_location': '/temp/location/',
           'job_name': 'dataflow_pipeline_job',
           'project': PROJECT,
           'teardown_policy': 'TEARDOWN_ALWAYS',
           'max_num_workers': 3,
           'region': REGION,
           'subnetwork': 'regions/<REGION>/subnetworks/<SUBNETWORK>',
           'no_save_main_session': True}
opts = beam.pipeline.PipelineOptions(flags=[], **options)  
p = beam.Pipeline('DataflowRunner', options=opts)
(p 
 | 'read' >> beam.io.Read(beam.io.BigQuerySource(query=selquery, use_standard_sql=True))
 | 'csv' >> beam.FlatMap(to_csv)
 | 'out' >> beam.io.Write(beam.io.WriteToText('OUTPUT_DIR/out.csv')))
p.run()

Error returned from stackdriver:

Workflow failed. Causes: The Dataflow job appears to be stuck because no worker activity has been seen in the last 1h. You can get help with Cloud Dataflow at https://cloud.google.com/dataflow/support.

Following warning:

S01:eval_out/WriteToText/Write/WriteImpl/DoOnce/Read+out/WriteToText/Write/WriteImpl/InitializeWrite failed.

Unfortunately not much else other than that. Other things to note:

  • The job ran locally without any error
  • The network is running in custom mode but is the default network
  • Python Version == 3.5.6
  • Python Apache Beam version == 2.16.0
  • The AI Platform Notebook is infact a GCE instance with a Deep Learning VM image deployed on top (with a container optimised OS), we have then used port forwarding to access the Jupyter environment
  • The service account requesting the job (Compute Engine default service account) has the necessary permissions required to complete this
  • Notebook instance, dataflow job, GCS bucket are all in europe-west1
  • I've also tried running this on a standard AI Platform Notebook and still the same problem.

Any help would be much appreciated! Please let me know if there is any other info I can provide which will help.


I've realised that my error is the same as the following:

Why do Dataflow steps not start?

The reason my job has gotten stuck is because the write to gcs step runs first even though it is meant to run last. Any ideas on how to fix this?

1
Is it possible to capture the command line output command form the notebook? Or can you instead run it on the command line and capture the output? This will print out a link to the dataflow UI, where you can debug further. You could also navigate to the dataflow UI manually and locate the most recently run job to get to the same page. Please take a look at these instructions and see if you can identify anything from the Dataflow UI's Stackdriver logs. cloud.google.com/dataflow/docs/guides/using-monitoring-intf - Alex Amato
Are you installing any dependencies via --requirements_file, --extra_package or --setup_file? Also please notice that some steps may not require any workers and they are marked as running all the time pipeline is running. WriteToText is one of those. So even when your workers are still not up the WriteToText will be in the «running» state. - Anton Bryzgalov

1 Answers

0
votes

Upon code inspection, I noticed that the syntax of the ‘WriteToText transform’ used does not match the one suggested in the Apache beam docs.

Please follow the “WriteToText” argument syntax as outlined in here.

The suggested workaround is to consider using BQ to CSV file export option available in batch mode.

There are even more export options available. The full list can be found in “the data formats and compression types” documentation here.