1
votes

In SDK 1.9.1 in Pubsub source there were PubsubIO.Read.maxReadTime and PubsubIO.Read.maxNumRecords methods available. Those methods allowed to create bounded collection from pubsub messages, it was possible to start Dataflow pipeline in batch mode.

How simillar thing could be achieved using Dataflow SDK 2.1? How can I read from Pubsub in Dataflow pipeline using batch mode?

2
Have you checked this? beam.apache.org/documentation/sdks/javadoc/2.1.0/index.html?org/… I'm recently using more Scio and I don't remember well in "pure" Beam but it seems similar to what you're looking for. However it seems that it must be placed after the PubsubIO.ReadMaC
Indeed that looks higly simillar, but how this could be applied to the pipeline? The only way of using that is to have access to the source which is deeply burried in the SDK and it's getting lost after used with apply. It could have been use by developers providing PubsubIO.Read, but how it could be used by developers using PubsubIO.Read API?Marcin Pietraszek

2 Answers

1
votes

You should not attempt to use the PubsubReader in a batch context. Instead, you should use the streaming PubsubIO provided, and set a windowing strategy as described here. You can use the composite trigger described in the “Other composite triggers” section (copied below) to get the behavior you want.

Repeatedly.forever(AfterFirst.of(
      AfterPane.elementCountAtLeast(100),
      AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))
-1
votes

Unfortunally, I didn't see any support for that in the new versions of SDK. What I did was implement a DoFn that reads from PubSub for maxReadTime or for maxNumRecords and return the messages.

That was what they did on the previous versions of the SDK. You can check the PubsubReader class.

You will have to call it like this:

 pipeline.begin()
            .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
            .apply(ParDo. of(new MyPubsubReader(maxNumRecords, maxReadTime));
            .setCoder(coder);