3
votes

I'm currently using Apache Beam with Google Dataflow for processing real time data. The data comes from Google PubSub, which is unbounded, so currently I'm using streaming pipeline. However, it turns out that having a streaming pipeline running 24/7 is quite expensive. To reduce cost, I'm thinking of switching to a batch pipeline that runs at a fixed time interval (e.g. every 30 minutes), since it's not really important for the processing to be real time for the user.

I'm wondering if it's possible to use PubSub subscription as a bounded source? My idea is that each time the job is run, it will accumulate the data for a 1 minute before triggering. So far it does not seem possible, but I've come across a class called BoundedReadFromUnboundedSource (which I have no idea how to use), so maybe there is a way?

Below is roughly how the source looks like:

PCollection<MyData> data = pipeline
            .apply("ReadData", PubsubIO
                    .readMessagesWithAttributes()
                    .fromSubscription(options.getInput()))
            .apply("ParseData", ParDo.of(new ParseMyDataFn()))
            .apply("Window", Window
                    .<MyData>into(new GlobalWindows())
                    .triggering(Repeatedly
                            .forever(AfterProcessingTime
                                    .pastFirstElementInPane()
                                    .plusDelayOf(Duration.standardSeconds(5))
                            )
                    )
                    .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
            );

I tried to do the following, but the job still runs in streaming mode:

PCollection<MyData> data = pipeline
            .apply("ReadData", PubsubIO
                    .readMessagesWithAttributes()
                    .fromSubscription(options.getInput()))
            .apply("ParseData", ParDo.of(new ParseMyDataFn()))

            // Is there a way to make the window trigger once and turning it into a bounded source?
            .apply("Window", Window
                    .<MyData>into(new GlobalWindows())
                    .triggering(AfterProcessingTime
                        .pastFirstElementInPane()
                        .plusDelayOf(Duration.standardMinutes(1))
                    )
                    .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
            );
1
Add relevant parts of your current code to get better answersTahir Akhtar
@TahirAkhtar I've added some code to better illustrate my question.Hasyimi Bahrudin
@HasyimiBahrudin did you find a way to do this? I have this running on a direct runner but can't get it to work on Google Data Flow.Campey

1 Answers

0
votes

This is not explicitly supported in PubsubIO currently, however you could try periodically starting a streaming job and programmatically invoking Drain on it a few minutes later.