I am listening to data from pub-sub using streaming data in dataflow. Then I need to upload to storage, process the data and upload it to bigquery.
here is my code:
public class BotPipline {
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject(MY_PROJECT);
options.setStagingLocation(MY_STAGING_LOCATION);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline.apply(PubsubIO.Read.maxNumRecords(1).subscription(MY_SUBSCRIBTION));
input.apply(TextIO.Write.to(MY_STORAGE_LOCATION));
input
.apply(someDataProcessing(...)).named("update json"))
.apply(convertToTableRow(...)).named("convert json to table row"))
.apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
);
pipeline.run();
}
}
when I run the code commenting the Writing to storage the code works well. but when I try uploading to big query I get this error (which is expected..):
Write can only be applied to a Bounded PCollection
I am not using bound since I need to run this all the time and I need the data to be uploaded immediately . Any solution?
EDIT: this my desired behavior:
I am receiving messages via pubsub. Each message should be stored in its own file in GCS as rough data, execute some processing on the data, and then save it to big query- having the file name in the data.
Data should be seen immediately after received in BQ example :
data published to pubsub : {a:1, b:2}
data saved to GCS file UUID: A1F432
data processing : {a:1, b:2} ->
{a:11, b: 22} ->
{fileName: A1F432, data: {a:11, b: 22}}
data in BQ : {fileName: A1F432, data: {a:11, b: 22}}
the idea is that the processed data is stored in BQ having a link to the Rough data stored in GCS