I'm building a pipeline with dynamic configuration data that gets updated whenever is triggered.
There are 2 PubSub topics, topic A for the IoT data, topic B is for the configuration that will be used to transform the IoT data.
The configuration is kept in Cloud Firestore. When the database gets updated, Cloud Function will read and send the updated configuration to PubSub topic B.
The problem is that Dataflow jobs only read configuration data at the start of the job and will never be updated.
How can I do it so that the side input gets updated?
p = beam.Pipeline(options=options)
class Transform(beam.DoFn):
def process(self, configuration):
...
yield output
def run():
...
iot_data = (p
| 'ReadIotData' >> ReadFromPubSub(TOPIC_A)
configuration = (p
| 'ReadConfig' >> ReadFromPubSub(TOPIC_B)
| 'WindowUserData' >> beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| 'JsonLoadsUserData' >> beam.Map(lambda x: ('data', x.decode().replace('\\','')))
output = (iot_data
| 'transform' >> beam.ParDo(Transform(),
beam.pvalue.AsDict(configuration))
| 'Output' >> WriteToPubSub(TOPIC_C)