1
votes

I have a simple pipeline that receives data from PubSub, prints it and then at every 10 seconds fires a window into a GroupByKey and prints that message again.

However this window seems to be delaying sometimes. Is this a google limitation or is there something wrong with my code:

 with beam.Pipeline(options=pipeline_options) as pipe:
        messages = (
            pipe
            | beam.io.ReadFromPubSub(subscription=known_args.input_subscription).with_output_types(bytes)
            | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
            | 'Ex' >> beam.ParDo(ExtractorAndPrinter())
            | beam.WindowInto(window.FixedWindows(10), allowed_lateness=0, accumulation_mode=AccumulationMode.DISCARDING, trigger=AfterProcessingTime(10) )
            | 'group' >> beam.GroupByKey()
            | 'PRINTER' >> beam.ParDo(PrinterWorker()))

enter image description here

Edit for the most recent code. I removed the triggers however the problem persists:

class ExtractorAndCounter(beam.DoFn):
    def __init__(self):
        beam.DoFn.__init__(self)

    def process(self, element, *args, **kwargs):
        import logging
        logging.info(element)
        return [("Message", json.loads(element)["Message"])]

class PrinterWorker(beam.DoFn):
    def __init__(self):
        beam.DoFn.__init__(self)

    def process(self, element, *args, **kwargs):
        import logging
        logging.info(element)
        return [str(element)]


class DefineTimestamp(beam.DoFn):
    def process(self, element, *args, **kwargs):
        from datetime import datetime
        return [(str(datetime.now()), element)]

def run(argv=None, save_main_session=True):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
      '--output_topic',
      required=True,
      help=(
          'Output PubSub topic of the form '
          '"projects/<PROJECT>/topics/<TOPIC>".'))
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument(
      '--input_topic',
      help=(
          'Input PubSub topic of the form '
          '"projects/<PROJECT>/topics/<TOPIC>".'))
    group.add_argument(
      '--input_subscription',
      help=(
          'Input PubSub subscription of the form '
          '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
    known_args, pipeline_args = parser.parse_known_args(argv)

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
pipeline_options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=pipeline_options) as pipe:
    messages = (
        pipe
        | beam.io.ReadFromPubSub(subscription=known_args.input_subscription).with_output_types(bytes)
        | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
        | 'Ex' >> beam.ParDo(ExtractorAndCounter())
        | beam.WindowInto(window.FixedWindows(10))
        | 'group' >> beam.GroupByKey()
        | 'PRINTER' >> beam.ParDo(PrinterWorker())
        | 'encode' >> beam.Map(lambda x: x.encode('utf-8'))
        | beam.io.WriteToPubSub(known_args.output_topic))



if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
2
Did you try not specifying a trigger?rmesteves
Yep I did, same issue.BryceSoker
May you provide the whole code so I can try to reproduce your problem?rmesteves
@rmesteves here ya goBryceSoker
I reproduced your problem and have similar results. Can you explain me exactly what is your problem? I mean, which delay is in fact bothering you?rmesteves

2 Answers

2
votes

So what this basically ask the pipeline to do is to group elements to 10 second windows and fire every window after 10 seconds have passed since the first element was received for each window (and discard the rest of the data for that window). Was that your intention ?

Assuming this was the case, note that triggering depends on the time elements were received by the system as well as the time the first element is received for each window. Probably this is why you are seeing some variation in your results.

I think if you need more consistent grouping for your elements you should use event time triggers instead of processing time triggers.

0
votes

All the triggers are based on best effort means they will be fired sometime after the specified duration, 10 sec in this case. Generally it happen close to the time specified but a few seconds delay is possible.

Also, the triggers are set for Key+Window. The window is derived from event time. It is possible that the first GBK pint at 10:30:04 is due to the first element which as at 10:29:52 The 2nd GBK print at 10:30:07 is due to the first element at 10:29:56

So it will be good to print the window and event timestamp for each element and then co-relate the data.