
The objective of this pipeline is to see how Ptransform works in Pubsub to Pubsub Python pipeline. I am giving the following input but it is giving me the same input back in the output pubsub. The idea is to just get one field out of the incoming stream from pubsub and send only that field to the output topic.

{"field_1": "14726485", "field_2": "3947183"}

class ExtractStoreStock(beam.PTransform):
    """A transform to extract a field
    def __init__(self, field):
        super(ExtractStoreStock, self).__init__()
        self.field = field

    def expand(self, pcoll):
        return (pcoll
                | beam.Map(lambda elem: (elem[self.field])))

def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    global cloud_options
    global custom_options

    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True
    cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    custom_options = pipeline_options.view_as(CustomPipelineOptions)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as pipeline:
        messages = (
                | "Read from PubSub"
                >> beam.io.ReadFromPubSub(subscription=custom_options.inputSubscription)

        get_stores = messages | "get_store" >> ExtractStoreStock('field_1')

        get_stores | "Write to PubSub" >> beam.io.WriteToPubSub(topic=custom_options.outputTopic)


if __name__ == "__main__":  # noqa


I am new to beam or google dataflow and I am confused about what to change in the transform to give me the desired result.


You probably need to add a json.loads to parse the byte string you read from Pub/Sub.

messages | beam.Map(json.loads) | "get_store" >> ExtractStoreStock('field_1')

Also you can simplify it to:

get_stores = (messages 
              | beam.Map(json.loads) 
              | beam.Map(lambda x: x['field_1']))