1
votes

I'm writing a Dataflow pipeline that will read from Google Pub/Sub and write the data to Google Cloud Storage:

    pipeline.apply(marketData)
        .apply(ParDo.of(new PubsubMessageToByteArray()))
        .apply(ParDo.of(new ByteArrayToString()))
        .apply(ParDo.of(new StringToMarketData()))
        .apply(ParDo.of(new AddTimestamps()))
        .apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
                .withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
                .accumulatingFiredPanes())
        .apply(ParDo.of(new MarketDataToCsv()))
        .apply("Write File(s)", TextIO
                .write()
                .to(options.getOutputDirectory())
                .withWindowedWrites()
                .withNumShards(1)
                .withFilenamePolicy(new WindowedFilenamePolicy(outputBaseDirectory))
                .withHeader(csvHeader));

    pipeline.run().waitUntilFinish();

I want to deduplicate elements and sort elements in the window before outputting the results. This is different from a typical PTransform in that I want the transform to execute once the window ends.

The Pub/Sub topic will have duplicates because multiple workers are producing the same messages in case one worker fails. How do I remove all duplicates within a window before writing? I see that a RemoveDuplicates class existed in Beam version 0.2, but not in the current version.

I understand that under the hood, Beam parallelizes the PTransforms across workers. But since this pipeline writes withNumShards(1), only one worker will write the final result. Which means that in theory, it should be possible to have that worker apply a deduplication transform before writing.

The Beam python sdk still has a RemoveDuplicates method, so I can reproduce that logic in Java, but why would it have been removed unless there's a better way? I'd imagine the implementation would be a deduplication ParDo that was executed after some window trigger.

EDIT: GroupByKey and SortValues look like they'll do what I need. I'm attempting to use those now.

1

1 Answers

3
votes

Here's an answer for the deduplication part:

.apply(Distinct
 // MarketData::key produces a String. Use withRepresentativeValue() 
 // because Apache beam deserializes Java objects into bytes, which 
 // could cause two equal objects to be interpreted as not equal. See 
 // org/apache/beam/sdk/transforms/Distinct.java for details. 
 .withRepresentativeValueFn(MarketData::key)
 .withRepresentativeType(TypeDescriptor.of(String.class)))

And here's a solution for sorting and deduplicating elements (in case sorting is needed also):

public static class DedupAndSortByTime extends 
        Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> {
    @Override
    public TreeSet<MarketData> createAccumulator() {
        return new TreeSet<>(Comparator
                .comparingLong(MarketData::getEventTime)
                .thenComparing(MarketData::getOrderbookType));
    }

    @Override
    public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) {
        accum.add(input);
        return accum;
    }

    @Override
    public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) {

        TreeSet<MarketData> merged = createAccumulator();
        for (TreeSet<MarketData> accum : accums) {
            merged.addAll(accum);
        }
        return merged;
    }

    @Override
    public List<MarketData> extractOutput(TreeSet<MarketData> accum) {
        return Lists.newArrayList(accum.iterator());
    }
}

So the updated pipeline is

    // Pipeline
    pipeline.apply(marketData)
        .apply(ParDo.of(new MarketDataDoFns.PubsubMessageToByteArray()))
        .apply(ParDo.of(new MarketDataDoFns.ByteArrayToString()))
        .apply(ParDo.of(new MarketDataDoFns.StringToMarketDataAggregate()))
        .apply(ParDo.of(new MarketDataDoFns.DenormalizeMarketDataAggregate()))
        .apply(ParDo.of(new MarketDataDoFns.AddTimestamps()))
        .apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
                .withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
                .accumulatingFiredPanes())
        .apply(Combine.globally(new MarketDataCombineFn.DedupAndSortByTime()).withoutDefaults())
        .apply(ParDo.of(new MarketDataDoFns.MarketDataToCsv()))
        .apply("Write File(s)", TextIO
                .write()
                // This doesn't set the output directory as expected. 
                // "/output" gets stripped and I don't know why,
                // so "/output" has to be added to the directory path 
                // within the FilenamePolicy.
                .to(options.getOutputDirectory())
                .withWindowedWrites()
                .withNumShards(1)
                .withFilenamePolicy(new MarketDataFilenamePolicy.WindowedFilenamePolicy(outputBaseDirectory))
                .withHeader(csvHeader));

    pipeline.run().waitUntilFinish();