I seem to be struggling with this pattern in Beam. This is a streaming pipeline.
At a high level:
- message comes in to Rabbit
- message contents include an ID and N S3 file paths
- I want to produce some aggregation across all S3 files listed, but the results should be keyed by original message
- write a message back to rabbit with the aggregation result, one for each incoming message
Inevitably, I end up with some PCollection[KV[MessageId, S3FilePaths]]
and want to apply a bunch of PTransform
s on the S3FilePaths
but not lose the fact that they were originally keyed by MessageId
.
I can't seem to find a generic "map the values of a KV pcollection but retain the key" functionality, and I believe I have to use a PTransform (as opposed to a DoFn
) because file IO has all been implemented as PTransforms.
Am I fundamentally thinking about this the wrong way? Any help is much appreciated.
Update: Sorry for being light on details. My own fault for posting this at the end of a frustrating Friday.
I had a few fundamental stumbling blocks:
- I've realized that
PCollection[KV
is really for combining already-loaded data. Trying to isolate eachV
as a separate set of pipeline operations didn't really mesh with the API - I didn't have my global windowing/triggering set up appropriately for the task at hand. Moreso, my transforms were not always preserving the window/pane semantics I had assumed.
- I have distinct s3 file paths coming in for each message, but due to issues like https://issues.apache.org/jira/browse/BEAM-7753, the
FileIO
APIs being oriented aroundPTransform
s which do not easily allow me to tag results with the incoming message id, and being unable to instantiateReadableFile
directly (it's package-private) I couldn't piece it together. I ended up wrapping the S3 client for java inside a customPTransform
that preserves the originalMessageId
along with each returned value.
At this point, I have something working end-to-end. My code is actually written in scio so it's a bit tricky to share, but at a high level:
- reading from RabbitMQ using a
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
trigger in a fixed, 1-second window, and being careful to useProcessContext.output
to preserve windowing and timestamps throughout - Utilizing the general shape of
PCollection[(MessageId, V)]
(scalaTuple2
syntax) throughout. WhenV
is a path to an S3 file, I utilize aPTransform
from the path emitting the contents of the file (it's an unsupported data format) - Aggregations are done after grouping to
PCollection[KV[(MessageId, FileElementId), FileElement]]
and then reduced back down toPCollection[MessageId, FileElement]
so that the semantics of reducing per incoming message are retained.
Number 2 was a tad disappointing to me. I was hoping to be able to utilize beam Filesystem functions to read from a file and combine each output with the message id it was specified in. But I'm in a good spot now.