0
votes

I have a pipeline that streams JSON messages from PubSub (Unbound PCollection) to Google Cloud Storage. Each file should contain multiple JSON objects, one per line.

I want to create another pipeline that should read all the JSON objects from this GCS bucket for further stream-processing. The most important thing is that this second pipeline should work as a stream rather than a batch. Means I want it to "listen" to the bucket and process every JSON object written to it. An Unbound PCollection.

Is there any way to achieve this behavior?

Thanks

2

2 Answers

2
votes

The streaming process only works with PubSub datasource. But don't worry, you can achieve your pipeline.

0
votes

Another user provided a good answer that tells you how to do what you want, however, if I understand your problem correctly, I think I can recommend a cleaner approach.

Assuming the following is true, that you want to:

  • Accept a Pub/Sub message at the beginning of your pipeline
  • Process the message by windowing it into a big window of messages
  • Write each window of messages into a GCS bucket as a file
  • In addition to the windowing described above, process each line in another way

Then you can instead create one pipeline that simply forks after the "accept Pub/Sub message" step. Dataflow supports this natively very well. You would save a reference to the PCollection object returned when you use the Pub/Sub sink at the beginning of your pipeline. Then, you would apply multiple chains of DoFn implementations etc to this one reference. You'll be able to do the windowing with writing to GCS like you do now, plus process each individual message in any way you like.

It might look like this:

Pipeline pipeline = Pipeline.create(options);

PCollection<String> messages = pipeline.apply("Read from Pub/Sub", PubsubIO.readStrings().fromTopic("my_topic_name));

// Current pipeline fork for windowing into GCS
messages.apply("Handle for GCS", ...);

// New fork for more handling
messages.apply("More stuff", ...);