0
votes

Context: I have a pipeline that listen to pub sub, the message to pubsub is published by an object change notification from a google cloud storage. The pipeline process the file using a XmlIO splitting it, so far so good.

The problem is: In the pubsub message (and in the object stored in the google cloud storage) I have some metadata that I would like to merge with the data from the XmlIO to compose the elements that the pipeline will process, how can I achieve this?

2
Are you using any windowing/triggering within the pipeline? - Lukasz Cwik
I am not using windowing/triggering - Felipe

2 Answers

2
votes

You can create a custom window and windowfn that stores the metadata from the pubsub message that you want to use later to enrich the individual records.

Your pipeline will look as follows:

ReadFromPubsub -> Window.into(CopyMetadataToCustomWindowFn) -> ParDo(ExtractFilenameFromPubsubMessage) -> XmlIO -> ParDo(EnrichRecordsWithWindowMetadata) -> Window.into(FixedWindows.of(...))

To start, you'll want to create a subclass of IntervalWindow that stores the metadata that you need. After that, create a subclass of WindowFn where in #assignWindows(...) you copy the metadata from the pubsub message into the IntervalWindow subclass you created. Apply your new windowfn using the Window.into(...) transform. Now each of the records that flow through the XmlIO transform will be within your custom windowfn that contains the metadata.

For the second step, you'll need to extract the relevant filename from the pubsub message to pass to the XmlIO transform as input.

For the third step, you want to extract out the custom metadata from the window in a ParDo/DoFn that is after the XmlIO. The records within XmlIO will preserve the windowing information that was passed through it (note that not all transforms do this but almost all do). You can state that your DoFn needs the window to be passed to your @ProcessElement, for example:

class EnrichRecordsWithWindowMetadata extends DoFn<...> {
  @ProcessElement
  public void processElement(@Element XmlRecord xmlRecord, MyCustomMetadataWindow metadataWindow) {
    ... enrich record with metadata on window ...
  }
}

Finally, it is a good idea to revert to one of the standard windowfns such as FixedWindows since the metadata on the window is no longer relevant.

1
votes

You can use directly pub/sub notification from Google Cloud Storage instead of introducing OCN in middle.

Google also suggest to use pub/sub. If you receive the pub/sub notification you can get the message attributes in it.

data = request.get_json()

object_id = data['message']['attributes']['objectGeneration']
bucket_name = data['message']['attributes']['bucketId']
object_name = data['message']['attributes']['objectId']