I am trying to use cloud dataflow's Python API to write a simple program that accepts input from a pubsub publisher, checks the input for a condition, and then outputs data to a topic. I have gotten the program written to where it is accepting and transforming input, as well as publishing data to a topic. My problem is that I want to only publish to the topic if a condition is true. For instance, if a keyword is present in the json data, then I want to publish a message to the pubsub topic, but not publish a message when the keyword doesn't exist. I have tried adding a global boolean flag that becomes true when a keyword is found, and wrapped it around these lines:
output = (lines
| 'format' >> beam.Map(format_result)
| 'encode' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(six.binary_type))
output | beam.io.WriteStringsToPubSub(self.output_topic)
That did not work. And I'm running out of ideas. Does anyone know if this can be done?