I am processing a windowed stream of PubSub messages and I would like to archive them to GCS. I'd like the archived files to have a prefix that's derived from the window timestamp (something like gs://bucket/messages/2015/01/messages-2015-01-01.json). Is this possible with TextIO.Write, or do I need to implement my own FileBasedSink?
4 Answers
This can be done with the recently added feature for windowed writes in TextIO
. Please see the documentation for TextIO, in particular see withWindowedWrites
and to(FilenamePolicy)
. This feature is also present in AvroIO
.
Are you simply looking for the function TextIO.Write.Bound<String>.withSuffix()
or TextIO.Write.Bound<String>.to()
? It seems these would allow you to provide a suffix or prefix for the output filename.
Right now, TextIO.Write does not support operation in streaming mode – writing to GCS is tricky, e.g., because you can't write to a file concurrently from multiple workers and you can't append to files once they close. We have plans to add streaming support to TextIO.
You'll get the best support for this today using BigQuery rather than GCS – because we already support BigQuery writes during streaming, and you choose which table you write to based on the window name, and BigQuery supports writes from many different workers at once.
TextIO.Write ought to work. No need for custom filesink.
In your case, you want to write your PubSub messages to an output text file - not locally, but on remote GS. You ought to be able to use: PCollection .apply.TextIO.Write().to(
Since you are processing a stream of PubSub messages, your window is unbounded and your PubSub data source already provides a timestamp for each element in the PCollection.
If you wish to assign a timestamp, your ParDo transform needs to use a DoFn that outputs elements using ProcessContext.outputWithTimestamp().
In summary, you can use TextIO.Write aftre ensuring the elements in your PCollection are output with timestamp.