0
votes

Can I insert data to different bigQuery datasets according to data I treat in a previous dataflow step?


I am creating a dataflow pipeline, it's reading from PubSub subscription and writing to big query table. It is defined as follow:

def run(argv=None, save_main_session=True):
    options: PipelineOptions = PipelineOptions(
        project='project-id',
        runner='DataflowRunner',
        region='region',
        streaming=True,
        setup_file='dataflow/setup.py',
        autoscaling_algorithm='THROUGHPUT_BASED',
        job_name='telemetry-processing'
    )

    with beam.Pipeline(options=options) as p:
        status = (
                p
                 | 'Get Status PubSub' >> beam.io.ReadFromPubSub(
            subscription='projects/project-id/subscriptions/subscription-id',
            with_attributes=True))

        status_records = (status| 'Proto to Dict' >> beam.Map(lambda x: 
convert_proto_to_dict(x, nozzle_status_proto.NozzleStatus)) )

        status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project- 
id:dataset-id.table-id')

         bytes_status = (status | 'Get Bytes Result' >> beam.ParDo(GetBytes()))
         bytes_status | 'Write to BQ BackUp' >> beam.io.WriteToBigQuery(
        'project-id:dataset-id.backup-table-id')

It is working exactly as expected for given input and output.
What I want is, regarding a particular attribute I have in my PubSubMessage, to define on which dataset my message should go. So the part I need to change is this one:

status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:dataset-id.table-id')

I already tried to extract the needed data and use it like this:

status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:{data-from-previous-step}.table-id')

But we can't get data from a PCollection directly.

I tried to overwrite WriteToBigQuery as in this post (How can I write to Big Query using a runtime value provider in Apache Beam?) but I got no error and nothing insert.

I do not see how to achieve this.
Do you know where I should start to do this?
Do I have to create n pipeline(s) for n dataset?

1
This should be possible. Example: github.com/pabloem/beam/blob/…Peter Kim

1 Answers

1
votes

The "table" parameter of WriteToBigQuery can be a function from element to the table it should be written to. For example:

status_records | 'Write' >> beam.io.WriteToBigQuery(
  lambda e: 'dataset1.invalid_records' if is_invalid(e) else 'dataset2.good_records')