0
votes

I am trying to understand windowing and triggers. I have created a streaming pipeline that runs on DirectRunner. The pipeline is as follows:

  • Reading data from pub sub
  • Parsing the data
  • Assigning timestamps
  • Windowing using FixedWindows() of 100 seconds
  • Grouping the elements in that window
  • Printing the output(with start and end of window)
  • Finally, writing to another pub sub topic

Data

Counts, Unix_Timestamp
1, 1553578200
2, 1553578201
3, 1553578202
4, 1553578203
...

Beam Pipeline

def encode_byte_string(element):
  #element = ', '.join(element)
  #count = str(count)
  element = str(element)
  print element
  return element.encode('utf-8')

def custom_timestamp(message):
  data, time_stamp = message.split(',')
  # assuming that message is already parsed JSON (dict)
  return beam.window.TimestampedValue(data, int(time_stamp))

class BuildRecordFn(beam.DoFn):
  def process(self, element, window=beam.DoFn.WindowParam):
    window_start = window.start
    window_end = window.end
    return [element + (window_start,) + (window_end,)]


pubsub_data = (
                p 
                | 'Read from pub sub' >> beam.io.ReadFromPubSub(subscription= input_subscription)
                | 'Remove extra chars' >> beam.Map(lambda data: (data.rstrip().lstrip()))
                | 'CustomTimestamp' >> beam.Map(custom_timestamp)
                | 'Window' >> beam.WindowInto(window.FixedWindows(100))
                | 'Form Key Value pair' >> beam.Map(lambda x: (1, int(x)))
                | 'Sum values' >> beam.GroupByKey()
                | 'AddWindowEndTimestamp' >> beam.ParDo(BuildRecordFn())
                | 'Encode to byte string' >> beam.Map(encode_byte_string)
                | 'Write to pub sub' >> beam.io.WriteToPubSub(output_topic)
              )


result = p.run()
result.wait_until_finish()

According to the docs

The default trigger for a PCollection is based on event time, and emits the results of the window when the Beam’s watermark passes the end of the window, and then fires each time late data arrives.

I am expecting that I should get result something like

(1, [1,2,3,4, ..., 99], Timestamp(1553578200), Timestamp(1553578300))
(1, [99,100,101,102, ..., 199], Timestamp(1553578300), Timestamp(1553578400))

However, when I run this pipeline, I am getting early results before even passing end of the window

(1, [11], Timestamp(1553578200), Timestamp(1553578300))
(1, [12, 16, 15], Timestamp(1553578200), Timestamp(1553578300))
(1, [19, 18, 8, 10, 23, 21, 1, 7, 9, 13], Timestamp(1553578200), Timestamp(1553578300))
(1, [5, 6, 14, 17, 20, 22], Timestamp(1553578200), Timestamp(1553578300))
(1, [33], Timestamp(1553578200), Timestamp(1553578300))
(1, [3], Timestamp(1553578200), Timestamp(1553578300))
(1, [24, 28, 29, 37, 39, 43], Timestamp(1553578200), Timestamp(1553578300))
(1, [2, 4], Timestamp(1553578200), Timestamp(1553578300))
(1, [48], Timestamp(1553578200), Timestamp(1553578300))
(1, [25, 31, 34, 36, 38, 40, 46, 49, 51], Timestamp(1553578200), Timestamp(1553578300))
(1, [26, 27, 30, 32, 41, 42, 45, 47], Timestamp(1553578200), Timestamp(1553578300))
(1, [44, 52], Timestamp(1553578200), Timestamp(1553578300))
(1, [35, 50], Timestamp(1553578200), Timestamp(1553578300))

What could be the reason for this?

1

1 Answers

0
votes

Seems like the case of late data that described in here. https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data

One way to mitigate this kind of issue is adding some kind of "delay" using withAllowedLateness. But, the function, seems only available right now in Java SDK.

One other way, probably using AfterWatermark trigger. But, I haven't tried it yet. So, take it with pinch of salt.