1
votes

I want to make a streaming Apache Beam pipeline in GCP which reads data from Google Pub/Sub and push it to GCS. I have the bit where I can read the data from Pub/Sub. My current code looks like that (picked it up from one of GCP Apache beam templates)

pipeline.apply("Read PubSub Events",
  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
                .apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
                .apply(
                        options.getWindowDuration() + " Window",
                        Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
                .apply(
                        "Write File(s)",
                        AvroIO.write(AdEvent.class)
                                .to(
                                        new WindowedFilenamePolicy(
                                                options.getOutputDirectory(),
                                                options.getOutputFilenamePrefix(),
                                                options.getOutputShardTemplate(),
                                                options.getOutputFilenameSuffix()))
                                .withTempDirectory(NestedValueProvider.of(
                                        options.getAvroTempDirectory(),
                                        (SerializableFunction<String, ResourceId>) input ->
                                                FileBasedSink.convertToFileResourceIfPossible(input)))
                                .withWindowedWrites()
                                .withNumShards(options.getNumShards()));

It can produce files which look like this windowed-file2020-04-28T09:00:00.000Z-2020-04-28T09:02:00.000Z-pane-0-last-00-of-01.avro

I want to store the data in GCS in dynamically created directories. In the following directories 2020-04-28/01, 2020-04-28/02, etc - the 01 and 02 are subdirectories denoting the hour of the day when the data got processed by the dataflow streaming pipeline.

Example:

gs://data/2020-04-28/01/0000000.avro
gs://data/2020-04-28/01/0000001.avro
gs://data/2020-04-28/01/....

gs://data/2020-04-28/02/0000000.avro
gs://data/2020-04-28/02/0000001.avro
gs://data/2020-04-28/02/....

gs://data/2020-04-28/03/0000000.avro
gs://data/2020-04-28/03/0000001.avro
gs://data/2020-04-28/03/....
...

The 0000000, 0000001, etc are simple file names which I have used for illustration, I do not expect the files to be sequentially names. Do you think this is possible in a GCP dataflow streaming setup?

3

3 Answers

3
votes

You can implement your own FilenamePolicy (perhaps using WindowedFilenamePolicy as a starting point) to use your own logic for defining output paths. You can use / characters in your file paths as you wish (by the way, GCS buckets are "flat", they don't really have directories). To get the dates/times, the windowedFilename method takes the window information as an argument, so you can use that in your return value however you see fit.

3
votes

You need to use writeDynamic instead of Write. Unfortunately, the AvroIO doesnt support writeDynamic natively as mentioned here and instead you would need to use a FileIO.

Below is a sample implementation in Scala using Scio

    val dynamicOutput: FileIO.Write[String, GenericRecord] = FileIO
      .writeDynamic[String, GenericRecord]()
      .by((input: GenericRecord) => {
        input.get("id").toString.toUpperCase  + "/"
      })
      .withDestinationCoder(StringUtf8Coder.of())
      .withNumShards(1) // Since input is small, restrict to one file per bucket
      .withNaming(
        new SerializableFunction[String, FileNaming] {
          override def apply(partitionCol: String): FileNaming = {
            FileIO.Write.defaultNaming(s"Id=$partitionCol", ".parquet")
          }
        }
      )
      .via(Contextful.fn[GenericRecord,GenericRecord](
          new SerializableFunction[GenericRecord,GenericRecord]{
            override def apply(input: GenericRecord): GenericRecord = {
              val r = new GenericData.Record(outputSchema)
              r.put("amount",input.get("amount"))
              r.put("name",input.get("name"))
              r.put("type",input.get("type"))
              r
            }
          }
        ),
        ParquetIO.sink(outputSchema)
      )
      .to("gs://bucket-name/table-name")

In the above example I am using GenericRecord type and specifying a schema and creating dynamic partitions and writing the file in Parquet format. You can choose to write the data in any format.

0
votes

You can use the Pub/Sub to Cloud Storage Avro template is a streaming pipeline that reads data from a Pub/Sub topic and writes Avro files into the specified Cloud Storage bucket. This pipeline supports optional user provided window duration to be used to perform windowed writes.