
I am trying to create my first pipleine in dataflow, I have the same code runnign when i execute using the interactive beam runner but on dataflow I get all sort of errors, which are not making much sense to me.

I am getting json from pub sub which is of the following format.


Here is the code of my pipeline.

from __future__ import absolute_import
import apache_beam as beam
#from apache_beam.runners.interactive import interactive_runner
#import apache_beam.runners.interactive.interactive_beam as ib
import google.auth
from datetime import timedelta
import json
from datetime import datetime
from apache_beam import window
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode, AfterCount
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
import argparse
import logging
from time import mktime

def setTimestamp(elem):
     from apache_beam import window
     yield window.TimestampedValue(elem, elem['timestamp'])

def createTuples(elem):
     yield (elem["sessionId"], elem)

class WriteToBigQuery(beam.PTransform):
  """Generate, format, and write BigQuery table row information."""
  def __init__(self, table_name, dataset, schema, project):
    """Initializes the transform.
      table_name: Name of the BigQuery table to use.
      dataset: Name of the dataset to use.
      schema: Dictionary in the format {'column_name': 'bigquery_type'}
      project: Name of the Cloud project containing BigQuery table.
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    #super(WriteToBigQuery, self).__init__()
    self.table_name = table_name
    self.dataset = dataset
    self.schema = schema
    self.project = project

  def get_schema(self):
    """Build the output table schema."""
    return ', '.join('%s:%s' % (col, self.schema[col]) for col in self.schema)

  def expand(self, pcoll):
    return (
        | 'ConvertToRow' >>
        beam.Map(lambda elem: {col: elem[col]
                               for col in self.schema})
        | beam.io.WriteToBigQuery(
            self.table_name, self.dataset, self.project, self.get_schema()))

class ParseSessionEventFn(beam.DoFn):
  """Parses the raw game event info into a Python dictionary.
  Each event line has the following format:
    user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
  The human-readable time string is not used here.
  def __init__(self):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    #super(ParseSessionEventFn, self).__init__()

  def process(self, elem):
          #timestamp = mktime(datetime.strptime(elem["timestamp"], "%Y-%m-%d %H:%M:%S").utctimetuple())
          elem['sessionId'] = int(elem['sessionId'])
          elem['landingPage'] = int(elem['landingPage'])
          yield elem

class AnalyzeSessions(beam.DoFn):
  """Parses the raw game event info into a Python dictionary.
  Each event line has the following format:
    user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
  The human-readable time string is not used here.
  def __init__(self):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    #super(AnalyzeSessions, self).__init__()

  def process(self, elem, window=beam.DoFn.WindowParam):
          sessionId = elem[0]
          uiud = elem[1][0]["uiud"]
          count_of_events = 0
          pageUrl = []
          window_end = window.end.to_utc_datetime()
          window_start = window.start.to_utc_datetime()
          session_duration = window_end - window_start
          for rows in elem[1]:
             if rows["landingPage"] == 1:
                    referrer = rows["refererr"]

          return {
             "session_duration": session_duration,
              "window_start" : window_start

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument('--topic', type=str, help='Pub/Sub topic to read from')
          '--subscription', type=str, help='Pub/Sub subscription to read from')
          help='BigQuery Dataset to write tables to. '
          'Must already exist.')
          help='The BigQuery table name. Should not already exist.')
          help='Numeric value of fixed window duration for user '
          'analysis, in minutes')
          help='Numeric value of gap between user sessions, '
          'in minutes')
          help='Numeric value of fixed window for finding mean of '
          'user session duration, in minutes')
    args, pipeline_args = parser.parse_known_args(argv)
    session_gap = args.session_gap * 60
    options = PipelineOptions(pipeline_args)
    # Set the pipeline mode to stream the data from Pub/Sub.
    options.view_as(StandardOptions).streaming = True

    options.view_as( StandardOptions).runner= 'DataflowRunner'
    options.view_as(SetupOptions).save_main_session = save_main_session
    p = beam.Pipeline(options=options)
    lines = (p
                | beam.io.ReadFromPubSub(
             | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
             | beam.Map(lambda x: json.loads(x))
             | beam.ParDo(ParseSessionEventFn())

    next = ( lines
                | 'AddEventTimestamps' >> beam.Map(setTimestamp)
                | 'Create Tuples' >> beam.Map(createTuples)
                | beam.Map(print) 
                | 'Window' >> beam.WindowInto(window.Sessions(15))
                | 'group by key' >> beam.GroupByKey()          
                | 'analyze sessions' >> beam.ParDo(AnalyzeSessions())         
                | 'WriteTeamScoreSums' >> WriteToBigQuery(

               "session_duration": 'INTEGER',
               "window_start" : 'TIMESTAMP'

    next1 = ( next
             | 'Create Tuples' >> beam.Map(createTuples)
             | beam.Map(print) 


    result = p.run()
#    result.wait_till_termination()

if __name__ == '__main__':

In the following code, I get the following error 'generator' object is not subscriptable, when I try to create tuples in my pipeline. I get it using yield is creating the generator object, even return doesn't work it just beaks my pipeline.

apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables File "sessiontest1.py", line 23, in createTuples TypeError: 'generator' object is not subscriptable [while running 'generatedPtransform-148']

Here is the code I use to execute the pipeline.

python3 sessiontest1.py     --project phrasal-bond-xxxxx     --region us-central1     --subscription projects/phrasal-bond-xxxxx/s
ubscriptions/xxxxxx     --dataset sessions_beam     --runner DataflowRunner     --temp_location gs://webevents/sessions --service_account_email-xxxxxxxx-
[email protected]  

Any help on this would be appreciated. Thanks guys, again first time working on dataflow, so not sure what I am missing here.

Other errors I was getting before that are sorted now:-

a) I get the error that widow is not defined from the line name beam.Map(lambda elem: window.TimestampedValue(elem, elem['timestamp'])) .

If I go beam.window then it says beam is not defined, according to me beam should be provided by dataflow,

NameError: name 'window' is not defined [while running 'generatedPtransform-3820']

You just need to import the modules in the function itself.

Can you use beam.window inside your lambda function and inside the WindowInto() metod? As follows: beam.Map(lambda elem: beam.window.TimestampedValue(.. and .WindowInto(beam.window.Sessions(.. as stated in the documentation, here. Did it work?Alexandre Moraes
@AlexandreMoraes Thanks for the input I got that part working, but I am running to a different issue now, 'generator' object is not subscriptable , this happens when I try to create tuples so that I can group them by key. I changed my code above so that you can see the changes. Any idea why that maybe happending.pavneet tiwana

2 Answers


Getting a 'generator' object is not subscriptable error on createTuples indicates that when you try to do elem["sessionID"], the elem is already a generator. The previous transform you do is setTimestamp, which is also using yield and therefore outputting a generator that gets passed as the element to createTuples.

The solution here is to implement setTimestamp and createTuples with return instead of yield. Return the element you want to receive in the following transform.


You should set save_main_session = True in your code. ( try to uncomment that line in your code). See more about NameError here : https://cloud.google.com/dataflow/docs/resources/faq