Problem Statement is We are customizing Google Provided PubSubToBQ Dataflow streaming java template in which we are configuring multiple subscripotion/topics to be read and push data into multiple Bigquery tables, this needs to be executed as single dataflow pipeline to read all streams from a source and push into Bigquery tables. But when we execute template from eclipse we have to pass Subscription/Topic and BQ details, and tempalte stage on gcs bucket then when we run template using gcloud command with different Subscription and BQ details. Dataflow job is not override with new Subscription or BQ tables.
Objective : My objective is to use Google Provided PubSubTOBQ.java class template and pass a list of subscription with corresponding Bigquery Table and create a pipeline of passing subscription per table. So n-n, n pipeline in a single Job.
I am using Google Provided PubSubTOBQ.java class template which is taking input as a single subscription or single topic and corresponding Big Query Table detail.
Now i need to customize this to take input as list of Topics or list of subscriptions as comma separated. Which i am able to take using ValueProvider> and inside main or run method i am iterating through Array of String and passing subscription/topic and bq table as a string. Look at the below code for more information.
What i read on gcp doc is we cannot pass ValueProvider Variables outside DoFn if we want to override or use value during rumtime to create dynamic Piepline. Not sure if we can read messages inside DoFn.
**PubsubIO.readMessagesWithAttributes().fromSubscription(providedSubscriptionArray[i])**
If Yes please let me know. So that my objective is achieved.
Code:
public static void main(String[] args) {
StreamingDataflowOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(StreamingDataflowOptions.class);
List<String> listOfSubStr = new ArrayList<String>();
List<String> listOfTopicStr = new ArrayList<String>();
List<String> listOfTableStr = new ArrayList<String>();
String[] providedSubscriptionArray = null;
String[] providedTopicArray = null;
String[] providedTableArray = null;
if (options.getInputSubscription().isAccessible()) {
listOfSubStr = options.getInputSubscription().get();
providedSubscriptionArray = new String[listOfSubStr.size()];
providedSubscriptionArray = createListOfProvidedStringArray(listOfSubStr);
}
if (options.getInputTopic().isAccessible()) {
listOfTopicStr = options.getInputTopic().get();
providedTopicArray = new String[listOfSubStr.size()];
providedTopicArray = createListOfProvidedStringArray(listOfTopicStr);
}
if (options.getOutputTableSpec().isAccessible()) {
listOfTableStr = options.getOutputTableSpec().get();
providedTableArray = new String[listOfSubStr.size()];
providedTableArray = createListOfProvidedStringArray(listOfTableStr);
}
Pipeline pipeline = Pipeline.create(options);
PCollection<PubsubMessage> readPubSubMessage = null;
for (int i = 0; i < providedSubscriptionArray.length; i++) {
if (options.getUseSubscription()) {
readPubSubMessage = pipeline
.apply(PubsubIO.readMessagesWithAttributes().fromSubscription(providedSubscriptionArray[i]));
} else {
readPubSubMessage = pipeline.apply(PubsubIO.readMessagesWithAttributes().fromTopic(providedTopicArray[i]));
}
readPubSubMessage
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("Convert Message To TableRow", ParDo.of(new PubsubMessageToTableRow()))
.apply("Insert Data To BigQuery",
BigQueryIO.writeTableRows().to(providedTableArray[i])
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
}
pipeline.run().waitUntilFinish();
}
Should be able to use single Dataflow PubSubTOBQ template for multiple pipeline of number of subscription corresponding to number of bigquery template in single Dataflow Streaming Job.
