I'm writing a Dataflow pipeline that will read from Google Pub/Sub and write the data to Google Cloud Storage:
pipeline.apply(marketData)
.apply(ParDo.of(new PubsubMessageToByteArray()))
.apply(ParDo.of(new ByteArrayToString()))
.apply(ParDo.of(new StringToMarketData()))
.apply(ParDo.of(new AddTimestamps()))
.apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
.withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
.accumulatingFiredPanes())
.apply(ParDo.of(new MarketDataToCsv()))
.apply("Write File(s)", TextIO
.write()
.to(options.getOutputDirectory())
.withWindowedWrites()
.withNumShards(1)
.withFilenamePolicy(new WindowedFilenamePolicy(outputBaseDirectory))
.withHeader(csvHeader));
pipeline.run().waitUntilFinish();
I want to deduplicate elements and sort elements in the window before outputting the results. This is different from a typical PTransform in that I want the transform to execute once the window ends.
The Pub/Sub topic will have duplicates because multiple workers are producing the same messages in case one worker fails. How do I remove all duplicates within a window before writing? I see that a RemoveDuplicates class existed in Beam version 0.2, but not in the current version.
I understand that under the hood, Beam parallelizes the PTransforms across workers. But since this pipeline writes withNumShards(1), only one worker will write the final result. Which means that in theory, it should be possible to have that worker apply a deduplication transform before writing.
The Beam python sdk still has a RemoveDuplicates method, so I can reproduce that logic in Java, but why would it have been removed unless there's a better way? I'd imagine the implementation would be a deduplication ParDo that was executed after some window trigger.
EDIT: GroupByKey and SortValues look like they'll do what I need. I'm attempting to use those now.