1
votes

When I was submitting my Dataflow job using DataflowRunner (I was using streaming job with Pub/Sub source), I made a mistake when defining the execution parameter of the BQ table name (let's say that the wrong table name is project-A) and the job threw some error. Then I updated the job using --update command with the correct table name, but the job then threw some error again i.e. the error told me that I am still using project-A as the BQ table name.

In short, this is what I was doing looks like:

  1. I submit a Dataflow job
python main.py \
 --job_name=dataflow-job1 \
 --runner=DataflowRunner \
 --staging_location=gs://project-B-bucket/staging \
 --temp_location=gs://project-B-bucket/temp \
 --dataset=project-A:table-A
  1. I got errors as project-A:table-A was not the correct dataset
{
  "error": {
    "code": 403,
    "message": "Access Denied: Dataset project-A:table-A: User does not have bigquery.datasets.get permission for dataset project-A:table-A.",
    "errors": [
      {
        "message": "Access Denied: Dataset project-A:table-A: User does not have bigquery.datasets.get permission for dataset project-A:table-A.",
        "domain": "global",
        "reason": "accessDenied"
      }
    ],
    "status": "PERMISSION_DENIED"
  }
}
  1. I updated the job using --update
python main.py \
 --job_name=dataflow-job1 \
 --runner=DataflowRunner \
 --staging_location=gs://project-B-bucket/staging \
 --temp_location=gs://project-B-bucket/temp \
 --dataset=project-B:table-B \
 --update
  1. Then I got the same error as before (point 2)

Why it seems like the old state of the job still retained? I thought that if Dataflow detect error on the job it will not process the pipeline and the Pub/Sub will not be ACKed and the pipeline will re-started.

Update 2020-12-08: This is how I pass the parameter arguments:

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--dataset')
...
class WriteToBigQuery(beam.PTransform):
    def __init__(self, name):
         self.name = name

    def expand(self, pcoll):
        return (pcoll
                | 'WriteBQ' >> beam.io.WriteToBigQuery(
            '{0}.my_table'.format(self.name),
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
def run(argv=None, save_main_session=True):
    pipeline_options = PipelineOptions(flags=argv)
    pipeline_options.view_as(StandardOptions).streaming = True
    my_args = pipeline_options.view_as(MyOptions)
    ...
    with beam.Pipeline(options=pipeline_options) as p:
        ...
        # I wrapped the BQ write component inside a PTransform class
        output | 'WriteBQ' >> WriteToBigQuery(my_args.dataset)
1
can you show how you pass the pipeline option to the BQ transform? - Pablo
@Pablo I have update my question with the code I used to pass pipeline option - tombrawz

1 Answers

3
votes

You can't change the pipeline parameters with you update the dataflow streaming jobs. You can only update the transforms of your pipeline.