0
votes

I have 2 questions on my development.

Question 1

I'm trying to create a template from a python code which consists of reading from BigQuery tables, apply some transformations and write in a different BigQuery table (which can exists or not).

The point is that I need to send the target table as parameter, but looks that I can't use parameters in the pipeline method WriteToBigQuery as it is raising the following error message: apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: project_target, type: str, default_value: 'Test').get() not called from a runtime context

Approach 1

with beam.Pipeline(options=options) as pipeline:
    logging.info("Start logic process...")
    kpis_report = (
            pipeline
            | "Process start" >> Create(["1"])
            | "Delete previous data" >> ParDo(preTasks())
            | "Read table" >> ParDo(readTable())
            ....
            | 'Write table 2' >> Write(WriteToBigQuery(
               table=custom_options.project_target.get() + ":" + custom_options.dataset_target.get() + "." + custom_options.table_target.get(),
        schema=custom_options.target_schema.get(),
        write_disposition=BigQueryDisposition.WRITE_APPEND,
        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)

Approach 2

I created a ParDo function in order to get there the variable and set the WriteToBigQuery method. However, despite of having the pipeline execution completed sucessfully and seeing that the output is returning rows (theoretically written), I can't see the table nor data inserted on it.

    with beam.Pipeline(options=options) as pipeline:
    logging.info("Start logic process...")
    kpis_report = (
            pipeline
            | "Process start" >> Create(["1"])
            | "Pre-tasks" >> ParDo(preTasks())
            | "Read table" >> ParDo(readTable())
            ....

            | 'Write table 2' >> Write(WriteToBigQuery())

Where I tried with 2 methods and none works: BigQueryBatchFileLoads and WriteToBigQuery

class writeTable(beam.DoFn):
def process(self, element):
    try:
        #Load first here the parameters from the custom_options variable (Here we can do it)

        result1 = Write(BigQueryBatchFileLoads(destination=target_table,
                                     schema=target_schema,
                                     write_disposition=BigQueryDisposition.WRITE_APPEND,
                                     create_disposition=BigQueryDisposition.CREATE_IF_NEEDED))
        
        result2 = WriteToBigQuery(table=target_table,
                        schema=target_schema,
                        write_disposition=BigQueryDisposition.WRITE_APPEND,
                        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                        method="FILE_LOADS"
                        ) 

Question 2

Other doubt I have is if in this last ParDo class, I need to return something as the element or result1 or result2 as we are in the last pipeline step.

Appreciate your help on this.

1
* More details about the successful execution: See the below link to see that the pipeline execution in the scenario 2 is working fine and it's returning rows, however the table nor data is available in BigQuery. i.stack.imgur.com/GCueP.pngDavid
* More details about the approach 2: I read somewhere I need to do the following step, but not sure how to do it: "Once you move it out of the DoFn, you need to apply the PTransform beam.io.gcp.bigquery.WriteToBigQuery to a PCollection for it to have any effect". Is that correct?David

1 Answers

0
votes

The most advisable way to do this is similar to #1, but passing the value provider without calling get, and passing a lambda for table:

with beam.Pipeline(options=options) as pipeline:
    logging.info("Start logic process...")
    kpis_report = (
            pipeline
            | "Process start" >> Create(["1"])
            | "Delete previous data" >> ParDo(preTasks())
            | "Read table" >> ParDo(readTable())
            ....
            | 'Write table 2' >> WriteToBigQuery(
               table=lambda x: custom_options.project_target.get() + ":" + custom_options.dataset_target.get() + "." + custom_options.table_target.get(),
        schema=custom_options.target_schema,
        write_disposition=BigQueryDisposition.WRITE_APPEND,
        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)

This should work.