When I was submitting my Dataflow job using DataflowRunner (I was using streaming job with Pub/Sub source), I made a mistake when defining the execution parameter of the BQ table name (let's say that the wrong table name is project-A) and the job threw some error. Then I updated the job using --update command with the correct table name, but the job then threw some error again i.e. the error told me that I am still using project-A as the BQ table name.
In short, this is what I was doing looks like:
- I submit a Dataflow job
python main.py \
--job_name=dataflow-job1 \
--runner=DataflowRunner \
--staging_location=gs://project-B-bucket/staging \
--temp_location=gs://project-B-bucket/temp \
--dataset=project-A:table-A
- I got errors as project-A:table-A was not the correct dataset
{
"error": {
"code": 403,
"message": "Access Denied: Dataset project-A:table-A: User does not have bigquery.datasets.get permission for dataset project-A:table-A.",
"errors": [
{
"message": "Access Denied: Dataset project-A:table-A: User does not have bigquery.datasets.get permission for dataset project-A:table-A.",
"domain": "global",
"reason": "accessDenied"
}
],
"status": "PERMISSION_DENIED"
}
}
- I updated the job using --update
python main.py \
--job_name=dataflow-job1 \
--runner=DataflowRunner \
--staging_location=gs://project-B-bucket/staging \
--temp_location=gs://project-B-bucket/temp \
--dataset=project-B:table-B \
--update
- Then I got the same error as before (point 2)
Why it seems like the old state of the job still retained? I thought that if Dataflow detect error on the job it will not process the pipeline and the Pub/Sub will not be ACKed and the pipeline will re-started.
Update 2020-12-08: This is how I pass the parameter arguments:
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--dataset')
...
class WriteToBigQuery(beam.PTransform):
def __init__(self, name):
self.name = name
def expand(self, pcoll):
return (pcoll
| 'WriteBQ' >> beam.io.WriteToBigQuery(
'{0}.my_table'.format(self.name),
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
def run(argv=None, save_main_session=True):
pipeline_options = PipelineOptions(flags=argv)
pipeline_options.view_as(StandardOptions).streaming = True
my_args = pipeline_options.view_as(MyOptions)
...
with beam.Pipeline(options=pipeline_options) as p:
...
# I wrapped the BQ write component inside a PTransform class
output | 'WriteBQ' >> WriteToBigQuery(my_args.dataset)