3
votes

I am trying to insert data from Cloud Storage to Big Query using DataFlow (Java). I can Batch upload the data; however, I want to set up a streaming upload instead. So as new objects are added to my bucket, they will get pushed to BigQuery.

I have set up the PipelineOptions to be Streaming and it shows in the GCP Console UI that the dataflow pipeline is of streaming type. My initial set of files/objects in the bucket get pushed to BigQuery.

But as I add new objects to my bucket, these do not get pushed to BigQuery. Why is that? How can I push objects that are added to my Cloud Storage to BigQuery using a steaming dataflow pipeline?

//Specify PipelineOptions
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);


  options.setProject(<project-name>);
  options.setStagingLocation(<bucket/staging folder>);    
  options.setStreaming(true);
  options.setRunner(DataflowRunner.class);

My interpretation is that because this is a streaming pipeline, as I add objects to Cloud Storage, they will get pushed to BigQuery.

Please suggest.

1

1 Answers

2
votes

How do you create your input collection? You need to have an unbounded input for the streaming pipeline to stay on, otherwise it will only be temporary (but will use streaming inserts). You could achieve this by reading from a subscription which has all the changes in your bucket, see https://cloud.google.com/storage/docs/pubsub-notifications for details.