I am trying to understand windowing and triggers. I have created a streaming pipeline that runs on DirectRunner. The pipeline is as follows:
- Reading data from pub sub
- Parsing the data
- Assigning timestamps
- Windowing using
FixedWindows()
of 100 seconds - Grouping the elements in that window
- Printing the output(with start and end of window)
- Finally, writing to another pub sub topic
Data
Counts, Unix_Timestamp
1, 1553578200
2, 1553578201
3, 1553578202
4, 1553578203
...
Beam Pipeline
def encode_byte_string(element):
#element = ', '.join(element)
#count = str(count)
element = str(element)
print element
return element.encode('utf-8')
def custom_timestamp(message):
data, time_stamp = message.split(',')
# assuming that message is already parsed JSON (dict)
return beam.window.TimestampedValue(data, int(time_stamp))
class BuildRecordFn(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
window_start = window.start
window_end = window.end
return [element + (window_start,) + (window_end,)]
pubsub_data = (
p
| 'Read from pub sub' >> beam.io.ReadFromPubSub(subscription= input_subscription)
| 'Remove extra chars' >> beam.Map(lambda data: (data.rstrip().lstrip()))
| 'CustomTimestamp' >> beam.Map(custom_timestamp)
| 'Window' >> beam.WindowInto(window.FixedWindows(100))
| 'Form Key Value pair' >> beam.Map(lambda x: (1, int(x)))
| 'Sum values' >> beam.GroupByKey()
| 'AddWindowEndTimestamp' >> beam.ParDo(BuildRecordFn())
| 'Encode to byte string' >> beam.Map(encode_byte_string)
| 'Write to pub sub' >> beam.io.WriteToPubSub(output_topic)
)
result = p.run()
result.wait_until_finish()
According to the docs
The default trigger for a PCollection is based on event time, and emits the results of the window when the Beam’s watermark passes the end of the window, and then fires each time late data arrives.
I am expecting that I should get result something like
(1, [1,2,3,4, ..., 99], Timestamp(1553578200), Timestamp(1553578300))
(1, [99,100,101,102, ..., 199], Timestamp(1553578300), Timestamp(1553578400))
However, when I run this pipeline, I am getting early results before even passing end of the window
(1, [11], Timestamp(1553578200), Timestamp(1553578300))
(1, [12, 16, 15], Timestamp(1553578200), Timestamp(1553578300))
(1, [19, 18, 8, 10, 23, 21, 1, 7, 9, 13], Timestamp(1553578200), Timestamp(1553578300))
(1, [5, 6, 14, 17, 20, 22], Timestamp(1553578200), Timestamp(1553578300))
(1, [33], Timestamp(1553578200), Timestamp(1553578300))
(1, [3], Timestamp(1553578200), Timestamp(1553578300))
(1, [24, 28, 29, 37, 39, 43], Timestamp(1553578200), Timestamp(1553578300))
(1, [2, 4], Timestamp(1553578200), Timestamp(1553578300))
(1, [48], Timestamp(1553578200), Timestamp(1553578300))
(1, [25, 31, 34, 36, 38, 40, 46, 49, 51], Timestamp(1553578200), Timestamp(1553578300))
(1, [26, 27, 30, 32, 41, 42, 45, 47], Timestamp(1553578200), Timestamp(1553578300))
(1, [44, 52], Timestamp(1553578200), Timestamp(1553578300))
(1, [35, 50], Timestamp(1553578200), Timestamp(1553578300))
What could be the reason for this?