0
votes

I have an issue where the dataflow job actually runs fine, but it does not produce any output until the job is manually drained.

With the following code I was assuming that it would produce windowed output, effectively triggering after each window.

    lines = (
        p
        | "read" >> source
        | "decode" >> beam.Map(decode_message)
        | "Parse" >> beam.Map(parse_json)
        | beam.WindowInto(
               beam.window.FixedWindows(5*60),
               trigger=beam.trigger.Repeatedly(beam.trigger.AfterProcessingTime(5*60)),
               accumulation_mode=beam.trigger.AccumulationMode.DISCARDING)
        | "write" >> sink 
    )

What I want is that if it has received events in a window, it should produce output after the window in any case. The source is a Cloud PubSub, with approximately 100 events/minute.

This is the parameters i use to start the job:

    python main.py \
        --region $(REGION) \
        --service_account_email $(SERVICE_ACCOUNT_EMAIL_TEST) \
        --staging_location gs://$(BUCKET_NAME_TEST)/beam_stage/ \
        --project $(TEST_PROJECT_ID) \
        --inputTopic $(TOPIC_TEST) \
        --outputLocation gs://$(BUCKET_NAME_TEST)/beam_output/ \
        --streaming \
        --runner DataflowRunner \
        --temp_location gs://$(BUCKET_NAME_TEST)/beam_temp/ \
        --experiments=allow_non_updatable_job \
        --disk_size_gb=200 \
        --machine_type=n1-standard-2 \
        --job_name $(DATAFLOW_JOB_NAME)

Any ideas on how to fix this? I'm using apache-beam 2.22 SDK, python 3.7

1

1 Answers

0
votes

Excuse me if you are referring to 2.22, because "apache-beam 1.22" seems to be old? Especially when you are using Python 3.7, you might want to try newer SDK versions such as 2.22.0.

What I want is that if it has received events in a window, it should produce output after the window in any case. The source is a Cloud PubSub, with approximately 100 events/minute.

If you just need one pane fired per window and fixed windows every 5 mins, you can simply go with

beam.WindowInto(beam.window.FixedWindows(5*60))

If you want to customize triggers, you can take a look at this document streaming-102.

Here is a streaming example with visualization of windowed outputs.

from apache_beam.runners.interactive import interactive_beam as ib

ib.options.capture_duration = timedelta(seconds=30)
ib.evict_captured_data()

pstreaming = beam.Pipeline(InteractiveRunner(), options=options)

words = (pstreaming
        | 'Read' >> beam.io.ReadFromPubSub(topic=topic)
        | 'Window' >> beam.WindowInto(beam.window.FixedWindows(5)))


ib.show(words, visualize_data=True, include_window_info=True)

If you run these code in a notebook environment such as jupyterlab, you get to debug streaming pipelines with outputs like this. Note the windows are visualized, for a period of 30 seconds, we get 6 windows as the fixed window is set to 5 seconds. You can bin data by windows to see what data came in which window.

You can setup your own notebook runtime following instructions; Or you can use hosted solutions provided by Google Dataflow Notebooks.