0
votes

We use python beam SDK with GCP's dataflow. Our pipeline depends on external system that we know has some delay. How can I write a pipeline that waits for N minutes (where N is constant I provide when launching job).

Something like pubsub -> (sleep for 1 minutes) -> read data from external system

My understanding of "FixedWindow" is it groups data into timeframe, so if I use 60 seconds fixed window I can achieve "up to 60 seconds" delay but I want here is constant 60 seconds delay for all incoming data.

2

2 Answers

2
votes

Since the Window question was answer by @Kenn Knowles allow me to answer the other half.

I think you could use Stateful and Timely processing and use a Timer of one minute for every element.

Bare in mind that the Timers are applied for each key, so each key would need to be unique in order for this to work. I made this code sample so you can test this, reading from Topic projects/pubsub-public-data/topics/taxirides-realtime.

p
            .apply("Read From PubSub", PubsubIO.readStrings().fromTopic(options.getTopic()))
            .apply("Parse and to KV", ParDo.of(new DoFn<String, KV<String, String>>() {
                       @ProcessElement
                       public void processElement(ProcessContext c) throws ParseException {
                           JSONObject json = new JSONObject(c.element());
                           String rideStatus = json.getString("ride_status");

                           // ride_id is unique for dropoff
                           String rideId = json.getString("ride_id"); // this is the session

                           if (rideStatus.equals("dropoff")) {
                                c.output(KV.of(rideId, "value"));
                            }
                       }
                   }
            ))
            // Stateful DoFn need to have a KV as input
            .apply("Timer", ParDo.of(new DoFn<KV<String, String>, String >() {

                        private final Duration BUFFER_TIME = Duration.standardSeconds(60);

                        @TimerId("timer")
                        private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

                        @StateId("buffer")
                        // Elements will be saved here, with type String
                        private final StateSpec<BagState<String>> bufferedEvents = StateSpecs.bag();

                        @ProcessElement
                        public void processElement(ProcessContext c,
                                                   @TimerId("timer") Timer timer,
                                                   @StateId("buffer") BagState<String> buffer)
                                throws ParseException {

                            // keys are unique, so no need to use counters to trigger the offset
                            timer.offset(BUFFER_TIME).setRelative();

                            buffer.add(c.element().getKey()); // add to buffer the unique id
                            LOG.info("TIMER: Adding "  + c.element().getKey() +
                                    " buffer at " + Instant.now().toString());

                        }

                        // This method is call when timers expire
                        @OnTimer("timer")
                        public void onTimer(
                                OnTimerContext c,
                                @StateId("buffer") BagState<String> buffer
                        ) throws IOException {
                            for (String id : buffer.read()) { // there should be only one since key is unique

                                LOG.info("TIMER: Releasing "  + id +
                                        " from buffer at " + Instant.now().toString());
                                c.output(id);
                            }
                            buffer.clear(); // clearing buffer

                        }
                    })
            );
    

This was just a quick test, so probably there would be things to improve in the code.

I am not sure, though, how would this perform with a lot of elements, since you are caching all elements for one minute in individual timers. I'm currently running this pipeline in Dataflow and so far so good, will update this if something weird happens.

The advantage of this vs using sleeps is that the sleep would need to wait for every single element in the bundle to sleep, while this does the wait parallely. The disadvantage may be using too much shuffle, but I haven't test this as much to be sure about this.

Note that in "normal" Stateful DoFns (1) keys are not expected to be unique, and in that case more than one element would be added to the bag, (2) using a counter or something to know if the timer has been offset already is needed, in this case we didn't need it since the keys are unique

Here you have a screenshot of the pipeline working

enter image description here

1
votes

FixedWindows does not introduce any delay.

In Beam, windowing groups elements according to their timestamps. This is separate from when the elements arrive.

The PubsubIO transform maintains a "watermark" which measures the timestamps that are still remaining in the Pubsub queue. So the watermark will lag real time by 1 minute.

If the pubsub topic becomes empty for a long time, the watermark will sync up with real time. So in that case you may need to allow late data in your pipeline.