2
votes

I'm learning Windowing & Triggering concepts in Apache with the intention to:

  • reads for unbounded sources (PubSub)
  • write the incoming message to localhost disk every 5 sec FIXED Window interval

ISSUE: no output getting written to localhost disk (the pipeline did create a beam-team- folder and wrote some files in there but no output.csv in intended destination getting written every 5 sec.)

  • running apache-beam==2.9.0, Python 2.7.10
  • Tried both: DirectRunner, as well as DataFlowRunner (with GCS Bucket as destination)

Here's the code (thanks a lot in advance for any advice):

p = beam.Pipeline(runner=None, options=options, argv=None)

"""
#1) Read incoming messages & apply Windowing
"""
lines = p | "read_sub" >> beam.io.gcp.pubsub.ReadFromPubSub(topic=None, subscription=SUBSCRIBER, with_attributes=True) \


"""
#2) Apply 5 sec Windowing
"""
          | 'window' >> beam.WindowInto(beam.window.FixedWindows(5))


"""
#3) apply Map() ops
"""
output = lines | "pardo" >> beam.Map(lambda x: x.data)


"""
#4) write out to localhost disk
"""

output | beam.io.WriteToText('output', file_name_suffix='.csv', header='time, colname1, colname2')

p.run().wait_until_finish()

thanks a lot in advance for any advice!

Cheers!

1
When running with the direct runner, can you check the output in the console? Is there any relevant information there? Any errors?Anton
no, nothing. only the deprecation warning: /Applications/py_venvs/dataflow_py2/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:365: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supportedVibhor Jain
Have you checked that there are no sharded files? something like output-0000-of-0010.csv ?Pablo
I checked already.. DIrectRunner only created a beam-temp-* directory where expected output files are dumped (temporarily?!).. but nothing gets output in expected output directory (python pipeline.py --runner DirectRunner --input input.csv --output output --streaming)Vibhor Jain
similar behaviour for Dataflow Runner as well, except for: it outputs sharded files but still in beam-temp-* directory.. nothing output in gs:// bucket location I passed as argument to beam.io.WriteToText()Vibhor Jain

1 Answers

3
votes

You are reading from unbounded source and trying to write to a bounded source. While Beam API for Java supports it using withWindowedWrites method, but its not supported in Python yet, which is a long awaited useful feature. So you need to either switch to java or write it to BigQuery.