0
votes

As part of Dataflow/Apache Beam, I would like to Read from a Source, then write to a source, and then again read from one source, and then again write in this order. How do i ensure the order of R->W->R->W in below? I believe the below runs like a parallel pipeline with R->W. I am not sure if to achieve this using PDone object.

(In example below consider that BIGQUERYVIEWB is a Big Query view formed from TESTDATASET1.TABLE2 and few other tables)

//Read 1
PCollection<TableRow> tr = pipeline.apply(BigQueryIO.readTableRows().fromQuery("SELECT ID FROM `TESTDATASET1.BIGQUERYVIEWA`").usingStandardSql());
PCollection<TableRow> tr1= tr.apply(ParDo.of(new SomeFn()));
//Write 1
tr1.apply(BigQueryIO.writeTableRows().withoutValidation()
                    .withSchema(FormatRemindersFn.getSchema())
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .to("TESTDATASET1.TABLE2"));
//Read 2
PCollection<TableRow> tr2 = pipeline.apply(BigQueryIO.readTableRows().fromQuery("SELECT ID FROM `TESTDATASET1.BIGQUERYVIEWB`").usingStandardSql());
PCollection<TableRow> tr3= tr.apply(ParDo.of(new SomeFn()));
//Write 2
tr3.apply(BigQueryIO.writeTableRows().withoutValidation()
                    .withSchema(FormatRemindersFn.getSchema())
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .to("TESTDATASET1.TABLE3"));
1
Why do you want to do this? If there's no dependancy then just treat them as seperate pipelines.Graham Polley
There is a dependancy, the above code is just a sample snippet and not actual. Say, the first write would insert into a Big Query table. The second read would read a BigQuery view involving few other tables along with the firstly inserted table. So, i would want a dependancyRoshan Fernando
If there is adependency then you need to declare it in the pipeline. i.e. setC should follow from setA, you could just pass meta data around. Although it would still not work well, you are not using the "data flow" approach, where small data items are flowing. It sounds more like a process orchestration.de1
Your example doesn't show a dependancy. Please update or share your actual code.Graham Polley
updated the codeRoshan Fernando

1 Answers

0
votes

Take a look at the Wait.On() transform that allows you to setup dependent steps.

Here is a simple pseudo code example:

PCollection<String> data = ...;

PCollection<Something> first = data.apply(ParDo.of(..))

data.apply(Wait.On(first)).apply(ParDo.of(..))

Note that Wait.On(..) requires a PCollection as a signal and not a PDone. I believe BigQuery.Write returns a WriteResult from which you can extract a PCollection for failed inserts.