I have an issue where the dataflow job actually runs fine, but it does not produce any output until the job is manually drained.
With the following code I was assuming that it would produce windowed output, effectively triggering after each window.
lines = (
p
| "read" >> source
| "decode" >> beam.Map(decode_message)
| "Parse" >> beam.Map(parse_json)
| beam.WindowInto(
beam.window.FixedWindows(5*60),
trigger=beam.trigger.Repeatedly(beam.trigger.AfterProcessingTime(5*60)),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING)
| "write" >> sink
)
What I want is that if it has received events in a window, it should produce output after the window in any case. The source is a Cloud PubSub, with approximately 100 events/minute.
This is the parameters i use to start the job:
python main.py \
--region $(REGION) \
--service_account_email $(SERVICE_ACCOUNT_EMAIL_TEST) \
--staging_location gs://$(BUCKET_NAME_TEST)/beam_stage/ \
--project $(TEST_PROJECT_ID) \
--inputTopic $(TOPIC_TEST) \
--outputLocation gs://$(BUCKET_NAME_TEST)/beam_output/ \
--streaming \
--runner DataflowRunner \
--temp_location gs://$(BUCKET_NAME_TEST)/beam_temp/ \
--experiments=allow_non_updatable_job \
--disk_size_gb=200 \
--machine_type=n1-standard-2 \
--job_name $(DATAFLOW_JOB_NAME)
Any ideas on how to fix this? I'm using apache-beam 2.22 SDK, python 3.7