I'm writing a Dataflow pipeline that will read from Google Pub/Sub and write the data to Google Cloud Storage:
pipeline.apply(marketData)
.apply(ParDo.of(new PubsubMessageToByteArray()))
.apply(ParDo.of(new ByteArrayToString()))
.apply(ParDo.of(new StringToMarketData()))
.apply(ParDo.of(new AddTimestamps()))
.apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
.withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
.accumulatingFiredPanes())
.apply(ParDo.of(new MarketDataToCsv()))
.apply("Write File(s)", TextIO
.write()
.to(options.getOutputDirectory())
.withWindowedWrites()
.withNumShards(1)
.withFilenamePolicy(new WindowedFilenamePolicy(outputBaseDirectory))
.withHeader(csvHeader));
pipeline.run().waitUntilFinish();
I want to deduplicate elements and sort elements in the window before outputting the results. This is different from a typical PTransform in that I want the transform to execute once the window ends.
The Pub/Sub topic will have duplicates because multiple workers are producing the same messages in case one worker fails. How do I remove all duplicates within a window before writing? I see that a RemoveDuplicates class existed in Beam version 0.2, but not in the current version.
I understand that under the hood, Beam parallelizes the PTransforms across workers. But since this pipeline writes withNumShards(1)
, only one worker will write the final result. Which means that in theory, it should be possible to have that worker apply a deduplication transform before writing.
The Beam python sdk still has a RemoveDuplicates method, so I can reproduce that logic in Java, but why would it have been removed unless there's a better way? I'd imagine the implementation would be a deduplication ParDo that was executed after some window trigger.
EDIT: GroupByKey and SortValues look like they'll do what I need. I'm attempting to use those now.