I am receiving messages to dataflow via pubsub in streaming mode (which is required for my desires). Each message should be stored in its own file in GCS. Since unbounded collections in TextIO.Write is not supported I tried to divide the PCollection into windows which contain one element each. And writes each window to google-cloud-storage.
Here is my code:
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject(PROJECT_ID);
options.setStagingLocation(STAGING_LOCATION);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub")
.subscription(SUBSCRIPTION);
PCollection<String> streamData = pipeline.apply(readFromPubsub);
PCollection<String> windowedMessage = streamData.apply(Window.<String>triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes());
e
windowedMessage.apply(TextIO.Write.to("gs://pubsub-outputs/1"));
pipeline.run();
}
I still receive the same error got before windowing.
The DataflowPipelineRunner in streaming mode does not support TextIO.Write.
What is the code for executing the described above.