0
votes

I have a program which creates a topic in pubSub and also publishes messages to the topic. I also have an automated dataflow job(using a template) which saves these messages into my BigQuery table. Now I intend to replace the template based job with a python pipeline where my requirement is to read data from PubSub, apply transformations and save the data into BigQuery/publish to another PubSub topic. I started writing the script in python and did a lot of trial and error to achieve it but to my dismay, I could not achieve it. The code looks like this:

import apache_beam as beam
from apache_beam.io import WriteToText
TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"

def run():
    o = beam.options.pipeline_options.PipelineOptions()
    p = beam.Pipeline(options=o)

    print("I reached here")
    # # Read from PubSub into a PCollection.
    data = (
        p
        | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
    )
    data | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
    print("Lines: ", data)
run()

I will really appreciate if I can get some help at the earliest. Note: I have my project set up on google cloud and I have my script running locally.

2

2 Answers

4
votes

Here the working code.

import apache_beam as beam

TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"


class PrintValue(beam.DoFn):
    def process(self, element):
        print(element)
        return [element]

def run():

    o = beam.options.pipeline_options.PipelineOptions()
    # Replace this by --stream execution param
    standard_options = o.view_as(beam.options.pipeline_options.StandardOptions)
    standard_options.streaming = True
    p = beam.Pipeline(options=o)

    print("I reached here")
    # # Read from PubSub into a PCollection.
    data = p | beam.io.ReadFromPubSub(topic=TOPIC_PATH) | beam.ParDo(PrintValue()) | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
    # Don't forget to run the pipeline!
    result = p.run()
    result.wait_until_finish()

run()

In summary

  • You miss to run the pipeline. Indeed, Beam is a Graph programming model. So, in your previous code, you built your graph but you never run it. Here, at the end, run it (not blocking call) and wait the end (blocking call)
  • When you start your pipeline, Beam mention that PubSub work only in streaming mode. Thus, you can start your code with --streaming param, or do it programmatically as shown in my code

Be careful, streaming mode means to listen indefinitively on PubSub. If you run this on Dataflow, your pipeline will be always up, until you stop it. This can be cost expensive if you have few message. Be sure that is the target model

An alternative is to use your pipeline for a limited period of time (you use scheduler for starting it, and another one for stopping it). But, at this moment, you have to stack message. Here you use a Topic as entry of the pipeline. This option force Beam to create a temporary subscription and to listen message on this subscription. This means that the message publish before this subscription creation won't be received and processed.

The idea is to create a subscription, by this way the message will be stacked in it (up to 7 days, by default). Then, use the subscription name in entry of your pipeline beam.io.ReadFromPubSub(subscription=SUB_PATH). The messages will be unstacked and processed by Beam (Order not guaranteed!)

0
votes

Based on the Beam programming guide, you simply have to add a Transform step in your pipeline. Here an example or transform:

class PrintValue(beam.DoFn):
  def process(self, element):
    print(element)
    return [element]

Add it to your pipeline

 data |  beam.ParDo(PrintValue()) | beam.io.WriteToPubSub(topic=OUTPUT_PATH)

You can add the number of transforms that you want. You can test the value and set the elements in tagged PCollection (for having multiple output) for fan out, or use side input for fan in PCollection.