I want to use Cloud Dataflow, PubSub & Bigquery to write tableRow to Pubsub messages and then write them to Bigquery.
I want the table name, project id and dataset id to be dynamic.
I saw the following code in the internet and I can not understood how to pass the data row params.
public void PubSub(String projectId , String datasetId,String tableId,String topicId)
PipelineOptions options = PipelineOptionsFactory.create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setStreaming(true);
Pipeline pipeline = Pipeline.create(dataflowOptions);
PCollection<TableRow> input = pipeline.apply(PubsubIO.Read.topic(createTopic(projectId,topicId).getName()).withCoder(TableRowJsonCoder.of()))
.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));
input.apply(BigQueryIO.Write.to(getTableReference(projectId,datasetId, tableId)).withSchema(getSchema()));
pipeline.run();
}
private static TableReference getTableReference(String projectId , String datasetId,String tableId) {
TableReference tableRef = new TableReference();
tableRef.setProjectId(projectId);
tableRef.setDatasetId(datasetId);
tableRef.setTableId(tableId);
return tableRef;
}
Thanks in advance, Gal