0
votes

The objective of this pipeline is to see how Ptransform works in Pubsub to Pubsub Python pipeline. I am giving the following input but it is giving me the same input back in the output pubsub. The idea is to just get one field out of the incoming stream from pubsub and send only that field to the output topic.

{"field_1": "14726485", "field_2": "3947183"}

class ExtractStoreStock(beam.PTransform):
    """A transform to extract a field
    """
    def __init__(self, field):
        super(ExtractStoreStock, self).__init__()
        self.field = field

    def expand(self, pcoll):
        return (pcoll
                | beam.Map(lambda elem: (elem[self.field])))


def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    global cloud_options
    global custom_options

    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True
    )
    cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    custom_options = pipeline_options.view_as(CustomPipelineOptions)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as pipeline:
        messages = (
                pipeline
                | "Read from PubSub"
                >> beam.io.ReadFromPubSub(subscription=custom_options.inputSubscription)
        )

        get_stores = messages | "get_store" >> ExtractStoreStock('field_1')

        get_stores | "Write to PubSub" >> beam.io.WriteToPubSub(topic=custom_options.outputTopic)

    pipeline.run()


if __name__ == "__main__":  # noqa
    logging.getLogger().setLevel(logging.INFO)

    run()

I am new to beam or google dataflow and I am confused about what to change in the transform to give me the desired result.

1

1 Answers

0
votes

You probably need to add a json.loads to parse the byte string you read from Pub/Sub.

messages | beam.Map(json.loads) | "get_store" >> ExtractStoreStock('field_1')

Also you can simplify it to:

get_stores = (messages 
              | beam.Map(json.loads) 
              | beam.Map(lambda x: x['field_1']))