1
votes

I have a dataflow pipeline which reads from a pubsub topic, performs transformations and writes to BigTable. I want the elements read from pubsub to be processed in order of their sequence number.

I am using a fixed window of 2 minutes and then applying a GroupByKey over it. After GBK I'm using a SortValues transform which sorts the Iterable on SequenceNumber. I'm observing the wall time of the GroupByKey step to be high, as all the elements within a window are being processed on a same worker. Is there an efficient way to sort elements within a Fixed Window?

Following is my pipeline code:

PCollection<PubsubMessage> pubsubRecords = p.apply(PubsubIO.readMessagesWithAttributes()
                    .fromTopic(StaticValueProvider.of(topic)));
            PCollection<KV<BigInteger, JSONObject>> window = pubsubRecords.apply("Raw to String", ParDo.of(new LogsFn()))
                    .apply("Window", Window
                            .<KV<BigInteger, JSONObject>>into(FixedWindows.of(Duration.standardMinutes(2)))
                            .triggering(Repeatedly
                                .forever(AfterProcessingTime
                                    .pastFirstElementInPane()
                                    .plusDelayOf(Duration.StandardMinutes(2))
                                )
                            )
                            .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
                        );
            PCollection<KV<String, KV<BigInteger, JSONObject>>> keyedWindow = window
                    .apply(WithKeys.of(new SerializableFunction<KV<BigInteger, JSONObject>,String>() {
                          @Override
                          public String apply(KV<BigInteger, JSONObject> row) {
                            return "key";
                          }
                    }));

            PCollection<KV<String, Iterable<KV<BigInteger, JSONObject>>>> groupedWindow = keyedWindow
                    .apply(GroupByKey.<String, KV<BigInteger, JSONObject>>create()).apply(
                            SortValues.<String, BigInteger, JSONObject>create(BufferedExternalSorter.options()));
1
Is there is reason why you have to sort all values rather than getting some largest/smallest values? What's the use case that requires ordered element?Rui Wang
I am replicating database operations from dynamoDB. I'm reading dynamoDB streams into a pubsub topic. The events have to be sorted because they are all database operationsAkshay Apte

1 Answers

2
votes

I think your approach is correct. It's unavoidable that you should have all elements to be sorted in a same worker. Ordered processing creates dependencies between data and often doesn't work well with distributed computing.