3
votes

I am trying to build a pipeline on Google Cloud Dataflow that would do the following:

  • Listen to events on Pubsub subscription
  • Extract the filename from event text
  • Read the file (from Google Cloud Storage bucket)
  • Store the records in BigQuery

Following is the code:

Pipeline pipeline = //create pipeline
pipeline.apply("read events", PubsubIO.readStrings().fromSubscription("sub"))
        .apply("Deserialise events", //Code that produces ParDo.SingleOutput<String, KV<String, byte[]>>)
        .apply(TextIO.read().from(""))???

I am struggling with 3rd step, not quite sure how to access the output of second step and use it in 3rd. I have tried writing the code that produces the following:

private ParDo.SingleOutput<KV<String, byte[]>, TextIO.Read> readFile(){
    //A class that extends DoFn<KV<String, byte[]>, TextIO.Read> and has TextIO.read wrapped into processElement method
}

However, I am not able to read the file content in subsequent step.

Could anyone please me know what do I need to write in 3rd and 4th steps so that I can consume the file line by line and store the output to BigQuery (or just log it).

2

2 Answers

4
votes

The natural way to express your read would be by using TextIO.readAll() method, which reads text files from an input PCollection of file names. This method has been introduced within the Beam codebase, but is not currently in a released version. It will be included in the Beam 2.2.0 release and the corresponding Dataflow 2.2.0 release.

-1
votes

You can get this done with using SerializableFunction.

You can do

pipeline.apply(TextIO.read().from(new FileNameFn()));

public class FileNameFn implements SerializableFunction<inputFileNameString, outputQualifiedFileNameStringWithBucket>

Obvious you can pass bucket name and other parameter statically while creating this class instance by constructor arguments.

Hope this will help.