1
votes

According to Apache Beam Documentation

The AfterWatermark trigger operates on event time. The AfterWatermark trigger emits the contents of a window after the watermark passes the end of the window, based on the timestamps attached to the data elements. The watermark is a global progress metric, and is Beam’s notion of input completeness within your pipeline at any given point. AfterWatermark only fires when the watermark passes the end of the window.

The default trigger for a PCollection is based on event time, and emits the results of the window when the Beam’s watermark passes the end of the window, and then fires each time late data arrives. However, if you are using both the default windowing configuration and the default trigger, the default trigger emits exactly once, and late data is discarded.

I tried to implement both of them and I got similar outputs using a fixed window.

with Afterwatermark Triger :

lines |'timestamp' >> beam.Map(get_timestamp)
           | 'window' >> beam.WindowInto(
            window.FixedWindows(20),
            trigger=trigger.AfterWatermark(),
            accumulation_mode=trigger.AccumulationMode.DISCARDING
        )
        | 'CountGlobally' >> beam.CombineGlobally(
                beam.combiners.CountCombineFn()
            ).without_defaults() 
        | 'printnbrarticles' >> beam.ParDo(PrintFn())
        | 'jsondumps' >> beam.Map(lambda x: json.dumps(x))
        | 'encode' >> beam.Map(lambda x: x.encode('utf-8'))
        | 'send_to_Pub/Sub' >> beam.io.WriteToPubSub(known_args.out_topic)
    )

with Default Trigger :

lines |'timestamp' >> beam.Map(get_timestamp)
           | 'window' >> beam.WindowInto(
            window.FixedWindows(20),
        )
        | 'CountGlobally' >> beam.CombineGlobally(
                beam.combiners.CountCombineFn()
            ).without_defaults() 
        | 'printnbrarticles' >> beam.ParDo(PrintFn())
        | 'jsondumps' >> beam.Map(lambda x: json.dumps(x))
        | 'encode' >> beam.Map(lambda x: x.encode('utf-8'))
        | 'send_to_Pub/Sub' >> beam.io.WriteToPubSub(known_args.out_topic)
    )
1

1 Answers

1
votes

You aren't seeing difference because you don't have late data. As described in the documentation, Afterwatermark is trigger only once after the watermark, Default Trigger is called after the watermark AND each time that late data arrive.

In addition, with Afterwatermark trigger, you can configure additional behavior (and call) in case of early data (data arrive before the opening of the window) or late data (arrive after the watermark).

You can't customize this with default trigger.