I'm dealing with a larger Dataflow Pipeline which works perfectly in batch mode but the done refactoring does have issues with side-inputs. If i put the pipeline in streaming mode and remove the side-input the pipeline works perfectly on google's dataflow.
If stripped everything down and build the following short script to capsule the issue and being able to play around with it.
import apache_beam as beam
import os
import json
import sys
import logging
""" Here the switches """
run_local = False
streaming = False
project = 'google-project-name'
bucket = 'dataflow_bucket'
tmp_location = 'gs://{}/{}/'.format(bucket, "tmp")
topic = "projects/{}/topics/dataflowtopic".format(project)
credentials_file = os.path.join(os.path.dirname(__file__), "credentials.json")
if os.path.exists(credentials_file):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_file
if not run_local:
runner = "DataflowRunner"
else:
runner = "DirectRunner"
argv = [
'--region={0}'.format("europe-west6"),
'--project={0}'.format(project),
'--temp_location={0}'.format(tmp_location),
'--runner={0}'.format(runner),
'--save_main_session',
'--max_num_workers=2'
]
if streaming:
argv.append('--streaming')
def filter_existing(item, existing=None, id_field: str = 'id'):
sys.stdout.write(".")
if not item.get(id_field) in existing:
return True
return False
class UserProcessor(beam.DoFn):
def process(self, user, **kwargs):
if run_local:
print("processing an user and getting items for it")
else:
logging.info("processing an user and getting items for it")
for i in range(10):
yield {"id": i, "item_name": "dingdong", "user": user}
class ExistingProcessor(beam.DoFn):
def process(self, user, **kwargs):
if run_local:
print("creating the list to exclude items from the result")
else:
logging.info("creating the list to exclude items from the result")
yield {"id": 3}
yield {"id": 5}
yield {"id": 9}
with beam.Pipeline(argv=argv) as p:
if streaming:
if run_local:
print("Waiting for Pub/Sub Message on Topic: {}".format(topic))
else:
logging.info("Waiting for Pub/Sub Message on Topic: {}".format(topic))
users = p | "Loading user" >> beam.io.ReadFromPubSub(topic=topic) | beam.Map(lambda x: json.loads(x.decode()))
else:
if run_local:
print("Loading Demo User")
else:
logging.info("Loading Demo User")
example_user = {"id": "indi","name": "Indiana Jones"}
users = p | "Loading user" >> beam.Create([example_user])
process1 = users | "load all items for user" >> beam.ParDo(UserProcessor().with_input_types(dict))
process2 = users | "load existing items for user" >> beam.ParDo(ExistingProcessor().with_input_types(dict)) | beam.Map(lambda x: x.get('id'))
if run_local:
process1 | "debug process1" >> beam.Map(print)
process2 | "debug process2" >> beam.Map(print)
#filtered = (process1, process2) | beam.Flatten() # this works
filtered = process1 | "filter all against existing items" >> beam.Filter(filter_existing, existing=beam.pvalue.AsList(process2)) # this does NOT work when streaming, it does in batch
if run_local:
filtered | "debug filtered" >> beam.Map(print)
filtered | "write down result" >> beam.io.WriteToText(os.path.join(os.path.dirname(__file__), "test_result.txt"))
else:
filtered | "write down result" >> beam.io.WriteToText("gs://{}/playground/output.txt".format(bucket))
if run_local:
print("pipeline initialized!")
else:
logging.info("pipeline initialized!")
Running this script as Batch job in Google's Dataflow does exactly what it needs to do. See the pipeline visualised from dataflow:
As soon as a set the the variable streaming = True
and run_local = False
the job does not work anymore and returns no error at all. I'm a bit lost there.
If i disable the sideinput in this part of the script:
# this works in streaming AND batch mode
# filtered = (process1, process2) | beam.Flatten() # this works
# this does NOT work in streaming mode, but DOES works in batch mode
filtered = process1 | "filter all against existing items" >> beam.Filter(filter_existing, existing=beam.pvalue.AsList(process2))
then the streaming job on dataflow works. I've narrowd it down the part beam.pvalue.AsList(process2)
which is causing the problem.
It might as well being said, that i'm fairly new to Apache Beam. One thing i could thing off which is the problem is the side input not being windowed (what ever that means, just got it from the documentation ;) )