I have a dataflow pipeline which reads from a pubsub topic, performs transformations and writes to BigTable. I want the elements read from pubsub to be processed in order of their sequence number.
I am using a fixed window of 2 minutes and then applying a GroupByKey over it. After GBK I'm using a SortValues transform which sorts the Iterable on SequenceNumber. I'm observing the wall time of the GroupByKey step to be high, as all the elements within a window are being processed on a same worker. Is there an efficient way to sort elements within a Fixed Window?
Following is my pipeline code:
PCollection<PubsubMessage> pubsubRecords = p.apply(PubsubIO.readMessagesWithAttributes()
.fromTopic(StaticValueProvider.of(topic)));
PCollection<KV<BigInteger, JSONObject>> window = pubsubRecords.apply("Raw to String", ParDo.of(new LogsFn()))
.apply("Window", Window
.<KV<BigInteger, JSONObject>>into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.StandardMinutes(2))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
);
PCollection<KV<String, KV<BigInteger, JSONObject>>> keyedWindow = window
.apply(WithKeys.of(new SerializableFunction<KV<BigInteger, JSONObject>,String>() {
@Override
public String apply(KV<BigInteger, JSONObject> row) {
return "key";
}
}));
PCollection<KV<String, Iterable<KV<BigInteger, JSONObject>>>> groupedWindow = keyedWindow
.apply(GroupByKey.<String, KV<BigInteger, JSONObject>>create()).apply(
SortValues.<String, BigInteger, JSONObject>create(BufferedExternalSorter.options()));