2
votes

Referring to item: Watching for new files matching a filepattern in Apache Beam

Can you use this for simple use cases? My use case is that I have user uploads data to Cloud Storage -> Pipeline (Process csv to json) -> Big Query. I know Cloud Storage is bounded collection so it represents Batch Dataflow.

What I would like is to do is keep pipeline running in streaming mode and as soon as a file is uploaded to Cloud Storage, it will be processed through pipeline. Is this possible with watchfornewfiles?

I wrote my code as follows:

p.apply(TextIO.read().from("<bucketname>")         
    .watchForNewFiles(
        // Check for new files every 30 seconds         
        Duration.standardSeconds(30),                      
        // Never stop checking for new files
        Watch.Growth.<String>never()));

None of the contents is being forwarded to Big Query, but the pipeline shows that it is streaming.

1

1 Answers

3
votes

You may use Google Cloud Storage Triggers here : https://cloud.google.com/functions/docs/calling/storage#functions-calling-storage-python

These triggers uses Cloud Functions similar to Cloud Pub/Sub which gets triggered on objects if they were: created/ deleted/archived/ or metadata change.

These event are sent using Pub/Sub notifications from Cloud Storage, but pay attention not to set many functions over the same bucket as there is some notification limits.

Also, at the end of the document there is a link to a sample implementation.