2
votes

I am receiving messages to dataflow via pubsub in streaming mode (which is required for my desires). Each message should be stored in its own file in GCS. Since unbounded collections in TextIO.Write is not supported I tried to divide the PCollection into windows which contain one element each. And writes each window to google-cloud-storage.

Here is my code:

public static void main(String[] args) {    

          DataflowPipelineOptions options = PipelineOptionsFactory.create()
                  .as(DataflowPipelineOptions.class);
                options.setRunner(BlockingDataflowPipelineRunner.class);                
                options.setProject(PROJECT_ID);             
                options.setStagingLocation(STAGING_LOCATION);
                options.setStreaming(true);
                Pipeline pipeline = Pipeline.create(options);

                PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub")
                        .subscription(SUBSCRIPTION);

                PCollection<String> streamData = pipeline.apply(readFromPubsub);        



                PCollection<String> windowedMessage = streamData.apply(Window.<String>triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes());
            e


                windowedMessage.apply(TextIO.Write.to("gs://pubsub-outputs/1"));

                pipeline.run();
        }

I still receive the same error got before windowing.

The DataflowPipelineRunner in streaming mode does not support TextIO.Write.

What is the code for executing the described above.

1
Possible duplicate of Creating a custom Sink in data flowjkff

1 Answers

2
votes

TextIO work with Bound PCollection, you could write into GCS with API Storage.

You could do :

    PipeOptions options = data.getPipeline().getOptions().as(PipeOptions.class);
    data.apply(WithKeys.of(new SerializableFunction<String, String>() {
             public String apply(String s) { return "mykey"; } }))          

    .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardMinutes(options.getTimeWrite()))))
    .apply(GroupByKey.create())
    .apply(Values.<Iterable<String>>create())
    .apply(ParDo.of(new StorageWrite(options)));

You create a Window with an operation of groupBy and you could write with iterable into Storage. the processElement of StorageWrite :

        PipeOptions options = c.getPipelineOptions().as(PipeOptions.class);
        String date = ISODateTimeFormat.date().print(c.window().maxTimestamp());
        String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp());
        String blobName = String.format("%s/%s/%s", options.getBucketRepository(), date, options.getFileOutName() + isoDate);

        BlobId blobId = BlobId.of(options.getGCSBucket(), blobName);

        WriteChannel writer = storage.writer(BlobInfo.builder(blobId).contentType("text/plain").build());

        for (Iterator<String> it = c.element().iterator(); it.hasNext();) {
            writer.write(ByteBuffer.wrap(it.next().getBytes()));
        }
        writer.close();