0
votes

I'm dealing with a larger Dataflow Pipeline which works perfectly in batch mode but the done refactoring does have issues with side-inputs. If i put the pipeline in streaming mode and remove the side-input the pipeline works perfectly on google's dataflow.

If stripped everything down and build the following short script to capsule the issue and being able to play around with it.

import apache_beam as beam
import os
import json
import sys
import logging

""" Here the switches """
run_local = False
streaming = False

project = 'google-project-name'
bucket = 'dataflow_bucket'
tmp_location = 'gs://{}/{}/'.format(bucket, "tmp")
topic = "projects/{}/topics/dataflowtopic".format(project)

credentials_file = os.path.join(os.path.dirname(__file__), "credentials.json")
if os.path.exists(credentials_file):
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_file

if not run_local:
    runner = "DataflowRunner"
else:
    runner = "DirectRunner"

argv = [
    '--region={0}'.format("europe-west6"),
    '--project={0}'.format(project),
    '--temp_location={0}'.format(tmp_location),
    '--runner={0}'.format(runner),
    '--save_main_session',
    '--max_num_workers=2'
]
if streaming:
    argv.append('--streaming')

def filter_existing(item, existing=None, id_field: str = 'id'):
    sys.stdout.write(".")
    if not item.get(id_field) in existing:
        return True
    return False


class UserProcessor(beam.DoFn):
    def process(self, user, **kwargs):

        if run_local:
            print("processing an user and getting items for it")
        else:
            logging.info("processing an user and getting items for it")
        for i in range(10):
            yield {"id": i, "item_name": "dingdong", "user": user}


class ExistingProcessor(beam.DoFn):
    def process(self, user, **kwargs):
        if run_local:
            print("creating the list to exclude items from the result")
        else:
            logging.info("creating the list to exclude items from the result")
        yield {"id": 3}
        yield {"id": 5}
        yield {"id": 9}


with beam.Pipeline(argv=argv) as p:
    if streaming:
        if run_local:
            print("Waiting for Pub/Sub Message on Topic: {}".format(topic))
        else:
            logging.info("Waiting for Pub/Sub Message on Topic: {}".format(topic))
        users = p | "Loading user" >> beam.io.ReadFromPubSub(topic=topic) | beam.Map(lambda x: json.loads(x.decode()))
    else:
        if run_local:
            print("Loading Demo User")
        else:
            logging.info("Loading Demo User")

        example_user = {"id": "indi","name": "Indiana Jones"}
        users = p | "Loading user" >> beam.Create([example_user])

    process1 = users | "load all items for user" >> beam.ParDo(UserProcessor().with_input_types(dict))
    process2 = users | "load existing items for user" >> beam.ParDo(ExistingProcessor().with_input_types(dict)) | beam.Map(lambda x: x.get('id'))

    if run_local:
        process1 | "debug process1" >> beam.Map(print)
        process2 | "debug process2" >> beam.Map(print)

    #filtered = (process1, process2) | beam.Flatten() # this works
    filtered = process1 | "filter all against existing items" >> beam.Filter(filter_existing, existing=beam.pvalue.AsList(process2)) # this does NOT work when streaming, it does in batch

    if run_local:
        filtered | "debug filtered" >> beam.Map(print)
        filtered | "write down result" >> beam.io.WriteToText(os.path.join(os.path.dirname(__file__), "test_result.txt"))
    else:
        filtered | "write down result" >> beam.io.WriteToText("gs://{}/playground/output.txt".format(bucket))

    if run_local:
        print("pipeline initialized!")
    else:
        logging.info("pipeline initialized!")

Running this script as Batch job in Google's Dataflow does exactly what it needs to do. See the pipeline visualised from dataflow:

Dataflow Job

As soon as a set the the variable streaming = True and run_local = False the job does not work anymore and returns no error at all. I'm a bit lost there.

If i disable the sideinput in this part of the script:

# this works in streaming AND batch mode
# filtered = (process1, process2) | beam.Flatten() # this works

# this does NOT work in streaming mode, but DOES works in batch mode
filtered = process1 | "filter all against existing items" >> beam.Filter(filter_existing, existing=beam.pvalue.AsList(process2))

then the streaming job on dataflow works. I've narrowd it down the part beam.pvalue.AsList(process2) which is causing the problem.

It might as well being said, that i'm fairly new to Apache Beam. One thing i could thing off which is the problem is the side input not being windowed (what ever that means, just got it from the documentation ;) )

2

2 Answers

2
votes

You are correct, the issue is that the side input is not windowed. When you run a Parallel Do (Map, FlatMap, Filter, ...) with a side input, the runner waits until the side input is "fully computed" before running the main input. In this case, it is reading from a PubSub source with no windowing, which means it is never "done" (i.e. some more data could come along in the future).

To make this work you need to window both sides, this way the side input will become "done up to time X" and then the Filter can run "up to time X" as X jumps forward from window boundary to window boundary.

0
votes

Thanks to the message of @robertwb i was able to fix my code. I've changed my pipeline code for the beam.io.ReadFromPubSub and added a windowing PTransform:

            users = p | "Loading user" >> beam.io.ReadFromPubSub(topic=topic) | beam.Map(lambda x: json.loads(x.decode())) \
                | "Window into Fixed Intervals" >> beam.WindowInto(beam.window.FixedWindows(10))

not sure if this is the best window for this usecase but it moved me forward a big step.