0
votes

I'm new to Apache Beam, so I'm struggling a bit with the following scenario:

  • Pub/Sub topic using Stream mode
  • Transform to take out customerId
  • Parallel PCollection with Transform/ParDo that fetches data from Firestore based on the "customerId" received in the Pub/Sub Topic (using Side Input)
  • ...

The ParDo transform that tries to fetch Firestore data does not run at all. If using "customerId" fixed value everything works as expected ... although not using a proper Fetch from Firestore (simple ParDo), it works. Am I doing something that is not supposed to? Including my code bellow:

class getFirestoreUsers(beam.DoFn):
    def process(self, element, customerId):

        print(f'Getting Users from Firestore, ID: {customerId}')

        # Call function to initialize Database
        db = intializeFirebase()

        """ # get customer information from the database
        doc = db.document(f'Customers/{customerId}').get()
        customer = doc.to_dict() """
        usersList = {}

        # Get Optin Users
        try:
            docs = db.collection(
                f'Customers/{customerId}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
            usersList = {user.id: user.to_dict() for user in docs}
        except Exception as err:
            print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
            print(err)

        return([usersList])

Main code

def run(argv=None):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--topic',
        type=str,
        help='Pub/Sub topic to read from')
    parser.add_argument(
        '--output',
        help=('Output local filename'))

    args, pipeline_args = parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)
    options.view_as(SetupOptions).save_main_session = True
    options.view_as(StandardOptions).streaming = True

    p = beam.Pipeline(options=options)

    users = (p | 'Create chars' >> beam.Create([
        {
             "clientMac": "7c:d9:5c:b8:6f:38",
             "username": "Louis"
             },
        {
            "clientMac": "48:fd:8e:b0:6f:38",
            "username": "Paul"
        }
    ]))


    # Get Dictionary from Pub/Sub
    data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
            | 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e))
            )

    # Get customerId from Pub/Sub information
    PcustomerId = (data | 'get customerId from Firestore' >>
                   beam.ParDo(lambda x: [x.get('customerId')]))
    PcustomerId | 'print customerId' >> beam.Map(print)

    # Get Users from Firestore
    custUsers = (users | 'Read from Firestore' >> beam.ParDo(
        getFirestoreUsers(), customerId=beam.pvalue.AsSingleton(PcustomerId)))
    custUsers | 'print Users from Firestore' >> beam.Map(print)

In order to avoid errors for running the function I had to initialise "users" dictionary, which I completely ignore aftewards. I suppose I have several errors here, so your help is much appreciated.

1
I assume PcustomerId | 'print customerId' >> beam.Map(print) will result in an empty pcollection and the side input for your Read from Firestore will likely wait for the side input forever. I'm curious why feed the customerId into your 'Read from Firestore' as side input instead of main input. Note that if you are using streaming mode without explicit windowing the default will be global window and it doesn't produce any output unless you have the proper trigger.Yichi Zhang
Hi Yichi, thanks for answering back. I did so many different approaches to the problem that I don't recall the exact output. The issue that I'm facing is that due to parallel processing of Beam, the data needed (customerID first and customers data after) is not ready when running the "main" PCollection with original JSON data from Pub/Sub. I needed a "wait" option, which I think it's not yet available in python ... any other way to do this?Rui Bras Fernandes

1 Answers

1
votes

It's not clear to me how users PCollection is used (since element is not processed in the process definition) in the example code. I've re-arranged the code a little bit with windowing and used the customer_id as the main input.

class GetFirestoreUsers(beam.DoFn):
  def setup(self):
    # Call function to initialize Database
    self.db = intializeFirebase()

  def process(self, element):
    print(f'Getting Users from Firestore, ID: {element}')

    """ # get customer information from the database
    doc = self.db.document(f'Customers/{element}').get()
    customer = doc.to_dict() """
    usersList = {}

    # Get Optin Users
    try:
        docs = self.db.collection(
            f'Customers/{element}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
        usersList = {user.id: user.to_dict() for user in docs}
    except Exception as err:
        print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
        print(err)

    return([usersList])



data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
          | beam.WindowInto(window.FixedWindow(60))
          | 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e)))

# Get customerId from Pub/Sub information
customer_id = (data | 'get customerId from Firestore' >>
               beam.Map(lambda x: x.get('customerId')))
customer_id | 'print customerId' >> beam.Map(print)

# Get Users from Firestore
custUsers = (cutomer_id | 'Read from Firestore' >> beam.ParDo(
    GetFirestoreUsers())
custUsers | 'print Users from Firestore' >> beam.Map(print)

From your comment:

the data needed (customerID first and customers data after) is not ready when running the "main" PCollection with original JSON data from Pub/Sub

Did you mean the data in firestore is not ready when reading the Pub/Sub topic?

You can always split the logic into 2 pipelines in your main function and run them one after another.