6
votes

I have a very basic Python Dataflow job that reads some data from Pub/Sub, applies a FixedWindow and writes to Google Cloud Storage.

transformed = ...
transformed | beam.io.WriteToText(known_args.output)

The output is written to the location specific in --output, but only the temporary stage, i.e.

gs://MY_BUCKET/MY_DIR/beam-temp-2a5c0e1eec1c11e8b98342010a800004/...some_UUID...

The file never gets placed into the correctly named location with the sharding template.

Tested on local and DataFlow runner.


When testing further, I have noticed that the streaming_wordcount example has the same issues, however the standard wordcount example writes fine. Perhaps the issues is to with windowing, or reading from pubsub?


It appears WriteToText is not compatible with the streaming source of PubSub. There are likely workarounds, or the Java version may be compatible, but I have opted to use a different solution altogether.

2
can you please post the code?Tanveer Uddin
Same problem here. Have you found a solution ? I tried playing with triggers but it not related to triggers. When I call "Drain" on my Dataflow job, data is written in the correct folder.Robin_
Try fileio.WriteToFiles.Pablo

2 Answers

9
votes

The WriteToText transform in the Python SDK does not support streaming.

Instead, you may consider the transforms in apache_beam.io.fileio. In this case, you can write something like this (let's suppose 10-minute windows):

my_pcollection = (p | ReadFromPubSub(....)
                    |  WindowInto(FixedWindows(10*60))
                    |  fileio.WriteToFiles(path=known_args.output))

This is enough to write out separate files for each window, and continue to do it as the stream advances.

You'd see files like this (let's suppose output is gs://mybucket/). The files would be printed as the windows get triggered:

gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0000-00002
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0001-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0000-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0001-00002
...

The files, by default have $prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix names - where prefix is output by default, but you can pass a more complex function for file naming.


If you want to customize how the files get written (e.g. naming of the files, or format of the data, or anything like that), you can look at the extra arguments in WriteToFiles.

You can see an example here of the transform being used in a Beam test, with more complex arguments - but it sounds like the default behavior should be enough for you.

0
votes

Python streaming pipeline execution is experimentally available (with some limitations).

Unsupported features apply to all runners. State and Timers APIs, Custom source API, Splittable DoFn API, Handling of late data, User-defined custom WindowFn.

Additionally, DataflowRunner does not currently support the following Cloud Dataflow specific features with Python streaming execution.

Streaming autoscaling Updating existing pipelines Cloud Dataflow Templates Some monitoring features, such as msec counters, display data, metrics, and element counts for transforms. However, logging, watermarks, and element counts for sources are supported.

https://beam.apache.org/documentation/sdks/python-streaming/

As you're using FixedWindowFn and the pipeline was able to writing the data into tmp location, please recheck the output location --output gs://<your-gcs-bucket>/<you-gcs-folder>/<your-gcs-output-filename>