I have a dataflow pipeline which reads from a pubsub topic, performs transformations and writes to BigTable. I want the elements read from pubsub to be processed in order of their sequence number.
I am using a fixed window of 2 minutes and then applying a GroupByKey over it. After GBK I'm using a SortValues transform which sorts the Iterable on SequenceNumber. I'm observing the wall time of the GroupByKey step to be high, as all the elements within a window are being processed on a same worker. Is there an efficient way to sort elements within a Fixed Window?
Following is my pipeline code:
PCollection<PubsubMessage> pubsubRecords = p.apply(PubsubIO.readMessagesWithAttributes()
PCollection<KV<BigInteger, JSONObject>> window = pubsubRecords.apply("Raw to String", ParDo.of(new LogsFn()))
.apply("Window", Window
.<KV<BigInteger, JSONObject>>into(FixedWindows.of(Duration.standardMinutes(2)))
PCollection<KV<String, KV<BigInteger, JSONObject>>> keyedWindow = window
.apply(WithKeys.of(new SerializableFunction<KV<BigInteger, JSONObject>,String>() {
public String apply(KV<BigInteger, JSONObject> row) {
return "key";
PCollection<KV<String, Iterable<KV<BigInteger, JSONObject>>>> groupedWindow = keyedWindow
.apply(GroupByKey.<String, KV<BigInteger, JSONObject>>create()).apply(
SortValues.<String, BigInteger, JSONObject>create(BufferedExternalSorter.options()));