0
votes

I'm quite new to Apache Beam and implemented my first pipelines.

But now I got to a point, where I am confused how to combine windowing and joining.


Problem definition:

I have two streams of data, one with pageviews of users, and another with requests of the users. They share the key session_id which describes the users session, but each have other additional data.

The goal is to compute the number of pageviews in a session before a request happened. That means, I want to have a stream of data that has every request together with the number of pageviews before the request. It suffices to have the pageviews of lets say the last 5 minutes.


What I tried

To load the requests I use this snippet, which loads the requests from a pubsub subscription and then extracts the session_id as key. Lastly, I apply a window which emits every request directly when it is received.

    requests = (p
               | 'Read Requests' >> (
                    beam.io.ReadFromPubSub(subscription=request_sub)
                    | 'Extract'        >> beam.Map(lambda x: json.loads(x))
                    | 'Session as Key' >> beam.Map(lambda request: (request['session_id'], request))
                    | 'Window'         >> beam.WindowInto(window.SlidingWindows(5 * 60, 1 * 60, 0),
                            trigger=trigger.AfterCount(1),
                            accumulation_mode=trigger.AccumulationMode.DISCARDING
                    )
                )
            )

Similarily, this snippet loads the pageviews, which applies a sliding window which is emitted accumulating whenever a pageview enters.

pageviews = (p
               | 'Read Pageviews' >> (
                  beam.io.ReadFromPubSub(subscription=pageview_sub)
                  | 'Extract'        >> beam.Map(lambda x: json.loads(x))
                  | 'Session as Key' >> beam.Map(lambda pageview: (pageview['session_id'], pageview))
                  | 'Window'         >> beam.WindowInto(
                            windowfn=window.SlidingWindows(5 * 60, 1 * 60, 0),
                            trigger=trigger.AfterCount(1),
                            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
                  )
               )
            )

To apply the join, I tried

combined = (
        {
            'requests': requests,
            'pageviews': pageviews
        }
    | 'Merge' >> beam.CoGroupByKey()
    | 'Print' >> beam.Map(print)
)

When I run this pipeline, there are never rows with requests as well as pageviews in the merged rows, only one of them is there.

My idea was to filter out pageviews before the request and count them after the cogroupby. What do I need to do? I suppose my problem is with the windowing and triggering strategy.

Its also quite important that the request get processed with low latency, possibly discarding pageviews that come in late.

1

1 Answers

1
votes

I found a solution myself, here's it in case somebody is interested:

Idea

The trick is to combine the two streams using the beam.Flatten operation and to use a Stateful DoFn to compute the number of pageviews before one request. Each stream contains json dictionaries. I embedded them by using {'request' : request} and {'pageview' : pageview} as a surrounding block, so that I can keep the different events apart in the Stateful DoFn. I also computed things like first pageview timestamp and seconds since first pageview along. The streams have to use the session_id as a key, such that the Stateful DoFn is receiving all the events of one session only.

Code

First of all, this is the pipeline code:

# Beam pipeline, that extends requests by number of pageviews before request in that session
with beam.Pipeline(options=options) as p:
    # The stream of requests
    requests = (
          'Read from PubSub subscription'   >> beam.io.ReadFromPubSub(subscription=request_sub)
        | 'Extract JSON'                    >> beam.ParDo(ExtractJSON())
        | 'Add Timestamp'                   >> beam.ParDo(AssignTimestampFn())
        | 'Use Session ID as stream key'    >> beam.Map(lambda request: (request['session_id'], request))
        | 'Add type of event'               >> beam.Map(lambda r: (r[0], ('request', r[1])))
    )

    # The stream of pageviews
    pageviews = (
          'Read from PubSub subscription'   >> beam.io.ReadFromPubSub(subscription=pageview_sub)
        | 'Extract JSON'                    >> beam.ParDo(ExtractJSON())
        | 'Add Timestamp'                   >> beam.ParDo(AssignTimestampFn())
        | 'Use Session ID as stream key'    >> beam.Map(lambda pageview: (pageview['session_id'], pageview))
        | 'Add type of event'               >> beam.Map(lambda p: (p[0], ('pageview', p[1])))
    )

    # Combine the streams and apply Stateful DoFn
    combined = (
        (
            p | ('Prepare requests stream' >> requests),
            p | ('Prepare pageviews stream' >> pageviews)
        )
        | 'Combine event streams'       >> beam.Flatten()
        | 'Global Window'               >> beam.WindowInto(windowfn=window.GlobalWindows(),
                                                            trigger=trigger.AfterCount(1),
                                                            accumulation_mode=trigger.AccumulationMode.DISCARDING)
        | 'Stateful DoFn'               >> beam.ParDo(CountPageviews())
        | 'Compute processing delay'    >> beam.ParDo(LogTimeDelay())
        | 'Format for BigQuery output'  >> beam.ParDo(FormatForOutputDoFn())
    )

    # Write to BigQuery.
    combined | 'Write' >> beam.io.WriteToBigQuery(
        requests_extended_table,
        schema=REQUESTS_EXTENDED_TABLE_SCHEMA,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

The interesting part is the combination of the two streams using beam.Flatten and applying the stateful DoFn CountPageviews()

Here's the code of the used custom DoFns:

# This DoFn just loads a json message
class ExtractJSON(beam.DoFn):
  def process(self, element):
    import json

    yield json.loads(element)

# This DoFn adds the event timestamp of messages into it json elements for further processing
class AssignTimestampFn(beam.DoFn):
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    import datetime

    timestamped_element = element
    timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
    timestamp = timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
    timestamped_element['timestamp_utc'] = timestamp_utc
    timestamped_element['timestamp'] = timestamp
    yield timestamped_element

# This class is a stateful dofn
# Input elements should be of form (session_id, {'event_type' : event}
# Where events can be requests or pageviews
# It computes on a per session basis the number of pageviews and the first pageview timestamp
# in its internal state
# Whenever a request comes in, it appends the internal state to the request and emits
# a extended request
# Whenever a pageview comes in, the internal state is updated but nothing is emitted
class CountPageviewsStateful(beam.DoFn):
  # The internal states
  NUM_PAGEVIEWS = userstate.CombiningValueStateSpec('num_pageviews', combine_fn=sum)
  FIRST_PAGEVIEW = userstate.ReadModifyWriteStateSpec('first_pageview', coder=beam.coders.VarIntCoder())

  def process(self,
              element,
              num_pageviews_state=beam.DoFn.StateParam(NUM_PAGEVIEWS),
              first_pageview_state=beam.DoFn.StateParam(FIRST_PAGEVIEW)
              ):
    import datetime

    # Extract element
    session_id = element[0]
    event_type, event = element[1]

    # Process different event types
    # Depending on event type, different actions are done
    if event_type == 'request':
        # This is a request
        request = event

        # First, the first pageview timestamp is extracted and the seconds since first timestamp are calculated
        first_pageview = first_pageview_state.read()
        if first_pageview is not None:
            seconds_since_first_pageview = (int(request['timestamp_utc'].timestamp()) - first_pageview)

            first_pageview_timestamp_utc = datetime.datetime.utcfromtimestamp(float(first_pageview))
            first_pageview_timestamp = first_pageview_timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
        else:
            seconds_since_first_pageview = -1
            first_pageview_timestamp = None

        # The calculated data is appended to the request
        request['num_pageviews'] = num_pageviews_state.read()
        request['first_pageview_timestamp'] = first_pageview_timestamp
        request['seconds_since_first_pageview'] = seconds_since_first_pageview
        
        # The pageview counter is reset
        num_pageviews_state.clear()
        
        # The request is returned
        yield (session_id, request)
    elif event_type == 'pageview':
        # This is a pageview
        pageview = event

        # Update first pageview state
        first_pageview = first_pageview_state.read()
        if first_pageview is None:
            first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))
        elif first_pageview > int(pageview['timestamp_utc'].timestamp()):
            first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))

        # Increase number of pageviews
        num_pageviews_state.add(1)
          
        # Do not return anything, pageviews are not further processed

# This DoFn logs the delay between the event time and the processing time
class LogTimeDelay(beam.DoFn):
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    import datetime
    import logging

    timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
    seconds_delay = (datetime.datetime.utcnow() - timestamp_utc).total_seconds()

    logging.warning('Delayed by %s seconds', seconds_delay)

    yield element

This seems to work and gives me an average delay of about 1-2 seconds on the direct runner. On Cloud Dataflow the average delay is about 0.5-1 seconds. So all in all, this seems to solve the problem definition.

Further considerations

There are some open questions, though:

  • I am using global windows, which means internal state will be kept forever as far as i am concerned. Maybe session windows are the correct way to go: When there are no pageviews/requests for x seconds, the window is closed and internal state is given free.
  • Processing delay is a little bit high, but maybe I need to tweak the pubsub part a little bit.
  • I do not know how much overhead or memory consumption this solution adds over standard beam methods. I also didn't test high workload and parallelisation.