i am new in using dataflow. I have following logic :
- Event is added to pubsub
- Dataflow reads pubsub and gets the event
- From event i am looking into MySQL to find relations in which segments this event have relation and list of relations is returned with this step. This segments are independent from one another.
- Each segment can be divided to two tables in MySQL results for email and mobile and they are independent as well.
- Each segment have rules that can be 1 to n . I would like to process this step in parallel and collect all results. I have tried to use Windows but i am not sure how to write the logic so when i get the combined results from all rules inside one segment all of them will be collected at end function and write the final logic inside MySQL depending from rule results ( boolean ).
Here is so far what i have :
testP = beam.Pipeline(options=options)
ReadData = (
testP | 'ReadData' >> beam.io.ReadFromPubSub(subscription=str(options.pubsubsubscriber.get())).with_output_types(bytes)
| 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'GetSegments' >> beam.ParDo(getsegments(options))
)
processEmails = (ReadData
| 'GetSubscribersWithRulesForEmails' >> beam.ParDo(GetSubscribersWithRules(options, 'email'))
| 'ProcessSubscribersSegmentsForEmails' >> beam.ParDo(ProcessSubscribersSegments(options, 'email'))
)
processMobiles = (ReadData
| 'GetSubscribersWithRulesForMobiles' >> beam.ParDo(GetSubscribersWithRules(options, 'mobile'))
| 'ProcessSubscribersSegmentsForMobiles' >> beam.ParDo(ProcessSubscribersSegments(options, 'mobile'))
)
#for sake of testing only window for email is written
windowThis = (processEmails
| beam.WindowInto(
beam.window.FixedWindows(1),
trigger=beam.transforms.trigger.Repeatedly(
beam.transforms.trigger.AfterProcessingTime(1 * 10)),
accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
| beam.CombinePerKey(beam.combiners.ToListCombineFn())
| beam.ParDo(print_windows)
)
processMobilesandprocessEmailsare correct, right? Do you want to window a series of events and write them to mysql per window? - Pablo