0
votes

I want to use Cloud Dataflow, PubSub & Bigquery to write tableRow to Pubsub messages and then write them to Bigquery. I want the table name, project id and dataset id to be dynamic.
I saw the following code in the internet and I can not understood how to pass the data row params.

public void PubSub(String projectId , String datasetId,String tableId,String topicId)       
    PipelineOptions options = PipelineOptionsFactory.create();
    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
    dataflowOptions.setStreaming(true);
    Pipeline pipeline = Pipeline.create(dataflowOptions);
    PCollection<TableRow> input = pipeline.apply(PubsubIO.Read.topic(createTopic(projectId,topicId).getName()).withCoder(TableRowJsonCoder.of()))
            .apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));

    input.apply(BigQueryIO.Write.to(getTableReference(projectId,datasetId, tableId)).withSchema(getSchema()));

    pipeline.run();
}


private static TableReference getTableReference(String projectId , String datasetId,String tableId) {
      TableReference tableRef = new TableReference();
      tableRef.setProjectId(projectId);
      tableRef.setDatasetId(datasetId);
      tableRef.setTableId(tableId);
      return tableRef;
}

Thanks in advance, Gal

1

1 Answers

0
votes

The BigQueryIO.Write transform does not support dynamic outputs. But you can make BigQuery API calls directly from a DoFn.

With this, you can set the table name to anything you want, as computed by your code. This could be looked up from a side input, or computed directly from the element the DoFn is currently processing.

To avoid making too many small calls to BigQuery, you can batch up the requests using finishBundle();

I don't completely understand if you want to write Dataflow to Pub/Sub and then Pub/Sub to BigQuery? You could just write directly to BigQuery without using Pub/Sub.