I'm quite new to Apache Beam and implemented my first pipelines.
But now I got to a point, where I am confused how to combine windowing and joining.
Problem definition:
I have two streams of data, one with pageviews of users, and another with requests of the users. They share the key session_id which describes the users session, but each have other additional data.
The goal is to compute the number of pageviews in a session before a request happened. That means, I want to have a stream of data that has every request together with the number of pageviews before the request. It suffices to have the pageviews of lets say the last 5 minutes.
What I tried
To load the requests I use this snippet, which loads the requests from a pubsub subscription and then extracts the session_id as key. Lastly, I apply a window which emits every request directly when it is received.
requests = (p
| 'Read Requests' >> (
beam.io.ReadFromPubSub(subscription=request_sub)
| 'Extract' >> beam.Map(lambda x: json.loads(x))
| 'Session as Key' >> beam.Map(lambda request: (request['session_id'], request))
| 'Window' >> beam.WindowInto(window.SlidingWindows(5 * 60, 1 * 60, 0),
trigger=trigger.AfterCount(1),
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
)
)
Similarily, this snippet loads the pageviews, which applies a sliding window which is emitted accumulating whenever a pageview enters.
pageviews = (p
| 'Read Pageviews' >> (
beam.io.ReadFromPubSub(subscription=pageview_sub)
| 'Extract' >> beam.Map(lambda x: json.loads(x))
| 'Session as Key' >> beam.Map(lambda pageview: (pageview['session_id'], pageview))
| 'Window' >> beam.WindowInto(
windowfn=window.SlidingWindows(5 * 60, 1 * 60, 0),
trigger=trigger.AfterCount(1),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
)
)
To apply the join, I tried
combined = (
{
'requests': requests,
'pageviews': pageviews
}
| 'Merge' >> beam.CoGroupByKey()
| 'Print' >> beam.Map(print)
)
When I run this pipeline, there are never rows with requests as well as pageviews in the merged rows, only one of them is there.
My idea was to filter out pageviews before the request and count them after the cogroupby. What do I need to do? I suppose my problem is with the windowing and triggering strategy.
Its also quite important that the request get processed with low latency, possibly discarding pageviews that come in late.