1
votes

I have an apache beam pipeline in python that has a flow like below for whatever reason.

client = bigquery.Client()
query_job1 = client.query('create table sample_table_1 as select * from table_1')  
result1 = query_job.result()

with beam.Pipeline(options=options) as p:

    records = (
            p
            | 'Data pull' >> beam.io.Read(beam.io.BigQuerySource(...))
            | 'Transform' >> ....
            | 'Write to BQ' >> beam.io.WriteToBigQuery(...)
    )

query_job2 = client.query('create table sample_table_2 as select * from table_2')  
result2 = query_job2.result()

SQL Job --> Datapipeline --> SQL Job

This sequence works fine when I run this locally. However when I was trying to run this as a Dataflow pipeline, it doesn't really run it in this order.

Is there a way to force the dependencies while running on dataflow?

1
In most cases you won't need to use the BigQuery client to create a table. Instead you can set create_disposition=CREATE_IF_NEEDED to let Dataflow create a new table before writing to it. If this doesn't work for you, then it'd help to know what query_job 1 and 2 are for. beam.apache.org/releases/pydoc/2.23.0/…Peter Kim
Dataflow pipeline creates a table that is used by query_job2 . How can I set dependencies in such scenarios?Bob
I can do something like below but I would like to run Pipeline1 first, once it is done trigger Pipeline2 Pipeline1 --> Read_from_BQ ---> Process --> Write_to_Bigquery Pipeline2 --> Read_from_BQ(SQL here uses the table created by Pipeline1) ---> Process --> Write_to_BigqueryBob
There is no way to achieve that without depending on another scheduling system like Composer/Airflow.Peter Kim
@Bob, I have posted the answer as CommunityWiki (which I do not get any reputation from it), since PeterKim was right. In case you found the information useful, I would appreciate if you accepted and upvoted the answer.Alexandre Moraes

1 Answers

1
votes

As @PeterKim mentioned, the processing flow you described in the comment section is not possible to be achieved with only Dataflow. Currently, Dataflow programming model does not support it.

You can use Composer in order to orchestrate sequential job executions which depend one another, here.