0
votes

I am having an issue trying to run a GCP Cloud-Dataflow pipeline.

The pipeline works when running locally using the "DirectRunner" but fails when trying to run in dataflow with the "DataflowRunner".

It fails immediately when calling run() on the pipeline with the error message given above(as opposed to first deploying to GCP then failing when actually running the pipeline).

The exception is thrown inside a call to beam.io.WriteToBigQuery:

(bq_rows 
| 'map_to_row' >> beam.Map(to_pred_row)
| 'write_to_table' >> beam.io.WriteToBigQuery(
    'my_dataset_name.my_table_name', 
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))

If I replace the last node in the pipeline with something that just writes to file:

(bq_rows 
| 'map_to_row' >> beam.Map(to_pred_row)
| 'debug_write_to_csv_2' >> beam.io.WriteToText(additional_opts.out_path, ".txt"))

Then everything works as expected and I get a text file with all the records I would expect.

If I run everything as is with the WriteToBigQuery() function but change back to the DirectRunner (and change nothing else) then everything works and the new rows are written to the BQ table.

As far as I can tell there is nothing remarkable about the records flowing in to the WriteToBigQuery node. I have output these to a text file both running locally and in the cloud in an effort to isolate a cause for this error but both outputs look identical (and matching the schema of the destination table). In any event it doesn't seem like things are getting far enough for an unexpected value or parameter when running the flow - as mentioned about this error happens whenever I call run() on the pipeline

Where am I going wrong?


UPDATE:

Here is a Minimal Example of the same behaviour. Having created a table named temp_e.words with a single (STRING, REQUIRED) column named word, I can reproduce the behaviour with this code:

import apache_beam as beam
from google.cloud import storage as gcs
import shutil
from google.cloud import bigquery as bq
import datetime
import os
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import (GoogleCloudOptions, 
                                                  StandardOptions)


def to_row(word):
  return {
      'word': word
  }

def run_pipeline(local_mode):

  PROJECT = 'searchlab-data-insights'
  REGION = 'us-central1'
  GCS_BUCKET_PATH = 'gs://temp-staging-e'

  timestamp = datetime.datetime.now().strftime('%y%m%d-%H%M%S')

  options = beam.pipeline.PipelineOptions([
      '--project', PROJECT
  ])

  if local_mode:
    RUNNER = 'DirectRunner'
  else:
    RUNNER = 'DataflowRunner'

  google_cloud_options = options.view_as(GoogleCloudOptions)
  google_cloud_options.project = PROJECT
  google_cloud_options.job_name = 'test-{}'.format(timestamp)
  google_cloud_options.staging_location = os.path.join(GCS_BUCKET_PATH, 'staging')
  google_cloud_options.temp_location = os.path.join(GCS_BUCKET_PATH, 'tmp')
  options.view_as(StandardOptions).runner = RUNNER

  p = beam.Pipeline(RUNNER, options=options)
  bq_rows = p | beam.Create(['words', 'to', 'store']) 

  (bq_rows 
    | 'map_to_row' >> beam.Map(to_row)
    | 'write_to_table' >> beam.io.WriteToBigQuery(
        'temp_e.words', 
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
  )

  job = p.run()

  if local_mode:
    job.wait_until_finish()
    print "Done!"

Now running run_pipeline(local_mode=True) produces the correct result and the rows are appended whereas running run_pipeline(local_mode=False) immediately triggers the error.

The full error generated is here: https://pastebin.com/xx8wwtXV

1
Show your code. Your problem is problem with write_to_predictions_table. What is Stackdriver reporting? - John Hanley
Can you add more information on your error like stack trace or line number? Also make sure that there's no empty row in your input data set. - ihji
Hi, I have updated the question with a reproducible example. Apologies this wasn't included 1st time around. Having created this example I can see that the behaviour is related to the ability of the dataflow runner to read a pre-existing schema. If I change to a NEW table the local runner fails too with a meaningful error message asking me to provide a schema. Explicitly providing the schema (even although the table already exists) is a working work-around for my original problem. Thanks for your help. - Stewart_R
@JohnHanley Thanks for your help - nothing was written to stackdriver as the process didn't get as far as actually launching the job on GCP. The failure happened when calling .run() - Stewart_R

1 Answers

1
votes

This issue appears to only occur when there is no schema provided to the call to beam.io.WriteToBigQuery. It seems that the DirectRunner can work using the existing table schema but the DataflowRunner cannot.

in the absence of a better answer, we can work around it by explicitly providing the schema.

So, for example, in the minimal example above we could use this:

(bq_rows 
| 'map_to__row' >> beam.Map(to_row)
| 'write_to_table' >> beam.io.WriteToBigQuery(
    'temp_e.words',
    schema={"fields":[{"type":"STRING","name":"word","mode":"REQUIRED"}]}
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)