0
votes

I seem to be struggling with this pattern in Beam. This is a streaming pipeline.

At a high level:

  • message comes in to Rabbit
  • message contents include an ID and N S3 file paths
  • I want to produce some aggregation across all S3 files listed, but the results should be keyed by original message
  • write a message back to rabbit with the aggregation result, one for each incoming message

Inevitably, I end up with some PCollection[KV[MessageId, S3FilePaths]] and want to apply a bunch of PTransforms on the S3FilePaths but not lose the fact that they were originally keyed by MessageId.

I can't seem to find a generic "map the values of a KV pcollection but retain the key" functionality, and I believe I have to use a PTransform (as opposed to a DoFn) because file IO has all been implemented as PTransforms.

Am I fundamentally thinking about this the wrong way? Any help is much appreciated.


Update: Sorry for being light on details. My own fault for posting this at the end of a frustrating Friday.

I had a few fundamental stumbling blocks:

  1. I've realized that PCollection[KV is really for combining already-loaded data. Trying to isolate each V as a separate set of pipeline operations didn't really mesh with the API
  2. I didn't have my global windowing/triggering set up appropriately for the task at hand. Moreso, my transforms were not always preserving the window/pane semantics I had assumed.
  3. I have distinct s3 file paths coming in for each message, but due to issues like https://issues.apache.org/jira/browse/BEAM-7753, the FileIO APIs being oriented around PTransforms which do not easily allow me to tag results with the incoming message id, and being unable to instantiate ReadableFile directly (it's package-private) I couldn't piece it together. I ended up wrapping the S3 client for java inside a custom PTransform that preserves the original MessageId along with each returned value.

At this point, I have something working end-to-end. My code is actually written in scio so it's a bit tricky to share, but at a high level:

  1. reading from RabbitMQ using a Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()) trigger in a fixed, 1-second window, and being careful to use ProcessContext.output to preserve windowing and timestamps throughout
  2. Utilizing the general shape of PCollection[(MessageId, V)] (scala Tuple2 syntax) throughout. When V is a path to an S3 file, I utilize a PTransform from the path emitting the contents of the file (it's an unsupported data format)
  3. Aggregations are done after grouping to PCollection[KV[(MessageId, FileElementId), FileElement]] and then reduced back down to PCollection[MessageId, FileElement] so that the semantics of reducing per incoming message are retained.

Number 2 was a tad disappointing to me. I was hoping to be able to utilize beam Filesystem functions to read from a file and combine each output with the message id it was specified in. But I'm in a good spot now.

1
Hi there, can you please edit your post with your pipeline code? It would help out immensely.Cubez

1 Answers

0
votes

There's no way to apply transforms to a KV<KeyT, ValueT> if they only accept a KeyT or ValueT. If you need to retain the key while applying transforms on the value, then the recommended approach is to either write your own DoFns that can accept a KV but ignore the key, or restructure your pipeline so that you don't rely on the output of transforms that require dropping the keys.