The objective of this pipeline is to see how Ptransform works in Pubsub to Pubsub Python pipeline. I am giving the following input but it is giving me the same input back in the output pubsub. The idea is to just get one field out of the incoming stream from pubsub and send only that field to the output topic.
{"field_1": "14726485", "field_2": "3947183"}
class ExtractStoreStock(beam.PTransform):
"""A transform to extract a field
"""
def __init__(self, field):
super(ExtractStoreStock, self).__init__()
self.field = field
def expand(self, pcoll):
return (pcoll
| beam.Map(lambda elem: (elem[self.field])))
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
global cloud_options
global custom_options
pipeline_options = PipelineOptions(
pipeline_args, streaming=True
)
cloud_options = pipeline_options.view_as(GoogleCloudOptions)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as pipeline:
messages = (
pipeline
| "Read from PubSub"
>> beam.io.ReadFromPubSub(subscription=custom_options.inputSubscription)
)
get_stores = messages | "get_store" >> ExtractStoreStock('field_1')
get_stores | "Write to PubSub" >> beam.io.WriteToPubSub(topic=custom_options.outputTopic)
pipeline.run()
if __name__ == "__main__": # noqa
logging.getLogger().setLevel(logging.INFO)
run()
I am new to beam or google dataflow and I am confused about what to change in the transform to give me the desired result.