1
votes

I am listening to data from pub-sub using streaming data in dataflow. Then I need to upload to storage, process the data and upload it to bigquery.

here is my code:

public class BotPipline {

public static void main(String[] args) {

    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    options.setRunner(BlockingDataflowPipelineRunner.class);
    options.setProject(MY_PROJECT);
    options.setStagingLocation(MY_STAGING_LOCATION);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> input = pipeline.apply(PubsubIO.Read.maxNumRecords(1).subscription(MY_SUBSCRIBTION));

    input.apply(TextIO.Write.to(MY_STORAGE_LOCATION));

    input
    .apply(someDataProcessing(...)).named("update json"))
    .apply(convertToTableRow(...)).named("convert json to table row"))
            .apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
    );
    pipeline.run();
}

}

when I run the code commenting the Writing to storage the code works well. but when I try uploading to big query I get this error (which is expected..):

Write can only be applied to a Bounded PCollection

I am not using bound since I need to run this all the time and I need the data to be uploaded immediately . Any solution?

EDIT: this my desired behavior:

I am receiving messages via pubsub. Each message should be stored in its own file in GCS as rough data, execute some processing on the data, and then save it to big query- having the file name in the data.

Data should be seen immediately after received in BQ example :

data published to pubsub : {a:1, b:2} 
data saved to GCS file UUID: A1F432 
data processing :  {a:1, b:2} -> 
                   {a:11, b: 22} -> 
                   {fileName: A1F432, data: {a:11, b: 22}} 
data in BQ : {fileName: A1F432, data: {a:11, b: 22}} 

the idea is that the processed data is stored in BQ having a link to the Rough data stored in GCS

1

1 Answers

2
votes

Currently we don't support writing unbounded collections in TextIO.Write. See related question.

Could you clarify what you would like the behavior of unbounded TextIO.Write to be? E.g. would you like to have one constantly growing file, or one file per window, closed when the window closes, or something else, or does it only matter to you that the total contents of the files written will eventually contain all the PubSub messages but it doesn't matter how the files are structured, etc?

As a workaround, you can implement writing to GCS as your own DoFn, using IOChannelFactory to interact with GCS (in fact, TextIO.Write is, under the hood, just a composite transform that a user could have written themselves from scratch).

You can access the window of the data using the optional BoundedWindow parameter on @ProcessElement. I'd be able to provide more advice if you explain the desired behavior.