0
votes

i am new in using dataflow. I have following logic :

  1. Event is added to pubsub
  2. Dataflow reads pubsub and gets the event
  3. From event i am looking into MySQL to find relations in which segments this event have relation and list of relations is returned with this step. This segments are independent from one another.
  4. Each segment can be divided to two tables in MySQL results for email and mobile and they are independent as well.
  5. Each segment have rules that can be 1 to n . I would like to process this step in parallel and collect all results. I have tried to use Windows but i am not sure how to write the logic so when i get the combined results from all rules inside one segment all of them will be collected at end function and write the final logic inside MySQL depending from rule results ( boolean ).

Here is so far what i have :

testP = beam.Pipeline(options=options)
    ReadData = (
        testP | 'ReadData' >> beam.io.ReadFromPubSub(subscription=str(options.pubsubsubscriber.get())).with_output_types(bytes) 
        | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
        | 'GetSegments' >> beam.ParDo(getsegments(options))
    )
    processEmails = (ReadData
        | 'GetSubscribersWithRulesForEmails' >> beam.ParDo(GetSubscribersWithRules(options, 'email'))
        | 'ProcessSubscribersSegmentsForEmails' >> beam.ParDo(ProcessSubscribersSegments(options, 'email'))
    )
     processMobiles = (ReadData
         | 'GetSubscribersWithRulesForMobiles' >> beam.ParDo(GetSubscribersWithRules(options, 'mobile'))
         | 'ProcessSubscribersSegmentsForMobiles' >> beam.ParDo(ProcessSubscribersSegments(options, 'mobile'))
     )
    #for sake of testing only window for email is written
    windowThis = (processEmails
        | beam.WindowInto(
            beam.window.FixedWindows(1),
            trigger=beam.transforms.trigger.Repeatedly(
                 beam.transforms.trigger.AfterProcessingTime(1 * 10)),
            accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
        | beam.CombinePerKey(beam.combiners.ToListCombineFn())
        | beam.ParDo(print_windows)
    )
1
Can you elaborate how far you got in your quest, and what is not working? I believe processMobiles and processEmails are correct, right? Do you want to window a series of events and write them to mysql per window? - Pablo
ProcessEmails and Process Mobiles are correct. Both functions have a list of rules that need to be checked if they are true or false. This can be done in the same function but ti will not be parallelized. Instead i would like to yield each rule , process it with another pardo and make sure that all rules from previous function are collected in the final function so i can compare all of them with additional logic. - user3241849
I believe this can be only done with windowing but i am not sure is it possible to open a window for each segment rules and close the window after last rule of segment is processed and collect them in final function. I believe this window with timing will collect rules from other segments as well and i will miss some crucial data , segment data will be corrupted by missing rules - user3241849
it sounds like you want to group all of the data belonging to the same pubsub message together, right? You may not need any windowing in this case.. - Pablo
yeap, basically it's one message from pubsub splitted into multiple elements that are processed from different pardo's and then on end i would like to collect all elements that come from same pubsub message from whole pipeline and make some aggregation on it - user3241849

1 Answers

1
votes

In this case, because all of your elements have the exact same timestamp, I would use their message ID, and their timestamp to group them with Session windows. It would be something like this:

testP = beam.Pipeline(options=options)
    ReadData = (
        testP | 'ReadData' >> beam.io.ReadFromPubSub(subscription=str(options.pubsubsubscriber.get())).with_output_types(bytes) 
        | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
        | 'GetSegments' >> beam.ParDo(getsegments(options))
    )

    # At this point, ReadData contains (key, value) pairs with a timestamp.
    # (Now we perform all of the processing
    processEmails = (ReadData | ....)
    processMobiles = (ReadData | .....)

    # Now we window by sessions with a 1-second gap. This is okay because all of
    # the elements for any given key have the exact same timestamp.
    windowThis = (processEmails
        | beam.WindowInto(beam.window.Sessions(1))  # Default trigger is fine
        | beam.CombinePerKey(beam.combiners.ToListCombineFn())
        | beam.ParDo(print_windows)
    )