0
votes

I am reading events from PubSub and the goal is to group them into windows. I would like to make the end of each window coincide with the minutes 0, 15, 30 and 45 of each hour.
Since this is a streaming job, it could be launched at any time, and I would like to find a way to align the size of the first window with the next ones.
This would be the stream:

  1. Launch the job
  2. Define as window_size the time remaining between this moment and the next quarter of an hour
  3. Starting from the end of this first window, set the window_size = int(15*60) (seconds).

For example:

  1. Launch the job
  2. Now it's 11:18, so fix window_size = (11:30-11:18).seconds
  3. When this first window will end, set window_size = int(15*60) (seconds)

In one of the examples provided by Google, the pipeline working with windowing is defined as follows, where window_size is a parameter passed as input by the user:

def expand(self, pcoll):
  return (
          pcoll
          | "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(self.window_size))
          | "Add Key" >> beam.Map(lambda elem: (None, elem))
          | "Groupby" >> beam.GroupByKey()
          | "Abandon Key" >> beam.MapTuple(lambda _, val: val)
  )
1
According to the documentation, windowing subdivides a PCollection according to the timestamps of its individual elements. Having said that, I would like first to ask how is your job executed ?Alexandre Moraes
As I mentioned in the question, it is a streaming job that receives events from PubSub, groups them into windows and then does other operations in the next steps of the pipeline. I would like to understand if there is a possibility to resize the window during execution, assigning one window_size to the first window and another one from the second window onwards.Federico Barusco

1 Answers

0
votes

Your use case is a perfect fit for Beam!

First, there is a basic conceptual issue to clear up:

  • The timestamps on elements, used for windowing, are called "event time". They are part of the data and describe when some event in your stream happened.
  • The time of launching and running the job is called "processing time". It is not part of your data.

You will be more successful if you do not combine or confuse these two. Windows do not "start" or "end" as part of your job's processing time. Windows "exist" for all time.

Using FixedWindows of 15 minutes will do just what you want. Every event will be associated with the 15 minute interval that it falls into. When you launch your job or when an event arrives for processing does not affect this.

UPDATE: adding example to illustrate:

Suppose you launch your job at 11:18 as in your question, and assume the incoming events are generated around the same time. Supposing the following events come in, with the timestamps indicated:

  • A @ 11:01
  • B @ 11:18
  • C @ 11:15
  • D @ 11:31
  • E @ 11:29

The elements will be assigned to windows as follows:

  • A in [11:00, 11:15)
  • B in [11:15, 11:30)
  • C in [11:15, 11:30)
  • D in [11:30, 11:45)
  • E in [11:15, 11:30)

Note that the window assignment is unrelated to when you started your job, or when the event arrives, or the order of arrival. You could actually start it tomorrow, or re-run it on archived data, or on data that isn't even close to in order, and the result would be the same. Event time windowing is based on the data.