I am trying to create my first pipleine in dataflow, I have the same code runnign when i execute using the interactive beam runner but on dataflow I get all sort of errors, which are not making much sense to me.
I am getting json from pub sub which is of the following format.
Here is the code of my pipeline.
from __future__ import absolute_import
import apache_beam as beam
#from apache_beam.runners.interactive import interactive_runner
#import apache_beam.runners.interactive.interactive_beam as ib
import google.auth
from datetime import timedelta
import json
from datetime import datetime
from apache_beam import window
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode, AfterCount
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
import argparse
import logging
from time import mktime
def setTimestamp(elem):
from apache_beam import window
yield window.TimestampedValue(elem, elem['timestamp'])
def createTuples(elem):
yield (elem["sessionId"], elem)
class WriteToBigQuery(beam.PTransform):
"""Generate, format, and write BigQuery table row information."""
def __init__(self, table_name, dataset, schema, project):
"""Initializes the transform.
table_name: Name of the BigQuery table to use.
dataset: Name of the dataset to use.
schema: Dictionary in the format {'column_name': 'bigquery_type'}
project: Name of the Cloud project containing BigQuery table.
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
#super(WriteToBigQuery, self).__init__()
self.table_name = table_name
self.dataset = dataset
self.schema = schema
self.project = project
def get_schema(self):
"""Build the output table schema."""
return ', '.join('%s:%s' % (col, self.schema[col]) for col in self.schema)
def expand(self, pcoll):
return (
| 'ConvertToRow' >>
beam.Map(lambda elem: {col: elem[col]
for col in self.schema})
| beam.io.WriteToBigQuery(
self.table_name, self.dataset, self.project, self.get_schema()))
class ParseSessionEventFn(beam.DoFn):
"""Parses the raw game event info into a Python dictionary.
Each event line has the following format:
user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
The human-readable time string is not used here.
def __init__(self):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
#super(ParseSessionEventFn, self).__init__()
def process(self, elem):
#timestamp = mktime(datetime.strptime(elem["timestamp"], "%Y-%m-%d %H:%M:%S").utctimetuple())
elem['sessionId'] = int(elem['sessionId'])
elem['landingPage'] = int(elem['landingPage'])
yield elem
class AnalyzeSessions(beam.DoFn):
"""Parses the raw game event info into a Python dictionary.
Each event line has the following format:
user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
The human-readable time string is not used here.
def __init__(self):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
#super(AnalyzeSessions, self).__init__()
def process(self, elem, window=beam.DoFn.WindowParam):
sessionId = elem[0]
uiud = elem[1][0]["uiud"]
count_of_events = 0
pageUrl = []
window_end = window.end.to_utc_datetime()
window_start = window.start.to_utc_datetime()
session_duration = window_end - window_start
for rows in elem[1]:
if rows["landingPage"] == 1:
referrer = rows["refererr"]
return {
"session_duration": session_duration,
"window_start" : window_start
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument('--topic', type=str, help='Pub/Sub topic to read from')
'--subscription', type=str, help='Pub/Sub subscription to read from')
help='BigQuery Dataset to write tables to. '
'Must already exist.')
help='The BigQuery table name. Should not already exist.')
help='Numeric value of fixed window duration for user '
'analysis, in minutes')
help='Numeric value of gap between user sessions, '
'in minutes')
help='Numeric value of fixed window for finding mean of '
'user session duration, in minutes')
args, pipeline_args = parser.parse_known_args(argv)
session_gap = args.session_gap * 60
options = PipelineOptions(pipeline_args)
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(StandardOptions).streaming = True
options.view_as( StandardOptions).runner= 'DataflowRunner'
options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=options)
lines = (p
| beam.io.ReadFromPubSub(
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| beam.Map(lambda x: json.loads(x))
| beam.ParDo(ParseSessionEventFn())
next = ( lines
| 'AddEventTimestamps' >> beam.Map(setTimestamp)
| 'Create Tuples' >> beam.Map(createTuples)
| beam.Map(print)
| 'Window' >> beam.WindowInto(window.Sessions(15))
| 'group by key' >> beam.GroupByKey()
| 'analyze sessions' >> beam.ParDo(AnalyzeSessions())
| 'WriteTeamScoreSums' >> WriteToBigQuery(
"session_duration": 'INTEGER',
"window_start" : 'TIMESTAMP'
next1 = ( next
| 'Create Tuples' >> beam.Map(createTuples)
| beam.Map(print)
result = p.run()
# result.wait_till_termination()
if __name__ == '__main__':
In the following code, I get the following error 'generator' object is not subscriptable, when I try to create tuples in my pipeline. I get it using yield is creating the generator object, even return doesn't work it just beaks my pipeline.
apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables File "sessiontest1.py", line 23, in createTuples TypeError: 'generator' object is not subscriptable [while running 'generatedPtransform-148']
Here is the code I use to execute the pipeline.
python3 sessiontest1.py --project phrasal-bond-xxxxx --region us-central1 --subscription projects/phrasal-bond-xxxxx/s
ubscriptions/xxxxxx --dataset sessions_beam --runner DataflowRunner --temp_location gs://webevents/sessions --service_account_email-xxxxxxxx-
[email protected]
Any help on this would be appreciated. Thanks guys, again first time working on dataflow, so not sure what I am missing here.
Other errors I was getting before that are sorted now:-
a) I get the error that widow is not defined from the line name beam.Map(lambda elem: window.TimestampedValue(elem, elem['timestamp'])) .
If I go beam.window then it says beam is not defined, according to me beam should be provided by dataflow,
NameError: name 'window' is not defined [while running 'generatedPtransform-3820']
You just need to import the modules in the function itself.
beam.Map(lambda elem: beam.window.TimestampedValue(..
as stated in the documentation, here. Did it work? – Alexandre Moraes