0
votes

I am trying to use cloud dataflow's Python API to write a simple program that accepts input from a pubsub publisher, checks the input for a condition, and then outputs data to a topic. I have gotten the program written to where it is accepting and transforming input, as well as publishing data to a topic. My problem is that I want to only publish to the topic if a condition is true. For instance, if a keyword is present in the json data, then I want to publish a message to the pubsub topic, but not publish a message when the keyword doesn't exist. I have tried adding a global boolean flag that becomes true when a keyword is found, and wrapped it around these lines:

output = (lines
          | 'format' >> beam.Map(format_result)
          | 'encode' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(six.binary_type))
output | beam.io.WriteStringsToPubSub(self.output_topic)

That did not work. And I'm running out of ideas. Does anyone know if this can be done?

1

1 Answers

1
votes

You can use a ParDo that yields elements only when the condition is satisfied. For example, consider the following message structure where the publish field indicates whether the processed message should be outputted by the ParDo or not:

{"publish":"true","body":"This message should be published"}
{"publish":"false","body":"This message should *NOT* be published"}

We will only yield the values when publish is set to true:

class FilterFn(beam.DoFn):
    def process(self, element):
        if (element['publish'] == 'true'):
          yield element['body']

and the main pipeline code:

lines = p | 'Read messages' >> beam.io.ReadStringsFromPubSub(topic=known_args.input)
jsons = lines | 'Load into JSON' >> beam.Map(lambda x: json.loads(x))
filtered = jsons | 'Filter messages' >> beam.ParDo(FilterFn())
filtered | 'Publish messages' >> beam.io.WriteStringsToPubSub(topic=known_args.output)

Pulling the messages from the output subscription will return only one message:

This message should be published