I'm currently using Apache Beam with Google Dataflow for processing real time data. The data comes from Google PubSub, which is unbounded, so currently I'm using streaming pipeline. However, it turns out that having a streaming pipeline running 24/7 is quite expensive. To reduce cost, I'm thinking of switching to a batch pipeline that runs at a fixed time interval (e.g. every 30 minutes), since it's not really important for the processing to be real time for the user.
I'm wondering if it's possible to use PubSub subscription as a bounded source? My idea is that each time the job is run, it will accumulate the data for a 1 minute before triggering. So far it does not seem possible, but I've come across a class called BoundedReadFromUnboundedSource
(which I have no idea how to use), so maybe there is a way?
Below is roughly how the source looks like:
PCollection<MyData> data = pipeline
.apply("ReadData", PubsubIO
.readMessagesWithAttributes()
.fromSubscription(options.getInput()))
.apply("ParseData", ParDo.of(new ParseMyDataFn()))
.apply("Window", Window
.<MyData>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
);
I tried to do the following, but the job still runs in streaming mode:
PCollection<MyData> data = pipeline
.apply("ReadData", PubsubIO
.readMessagesWithAttributes()
.fromSubscription(options.getInput()))
.apply("ParseData", ParDo.of(new ParseMyDataFn()))
// Is there a way to make the window trigger once and turning it into a bounded source?
.apply("Window", Window
.<MyData>into(new GlobalWindows())
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
);