5
votes

I'm struggling to get event time based triggers to fire for my apache beam pipeline but do seem to be able to trigger window firing with processing time.

My pipeline is fairly basic:

  1. I receive batches of data points which include millisecond level timestamps from pubsub reading in with a timestamp slightly earlier than the earliest batched data point. Data is batched to reduce client side workload & pubsub expenses.

  2. I extract second level time stamps & apply timestamps to the individual data points

  3. I window the data for processing & avoid using global window.

  4. I group the data by second for later categorizations by second of streaming data.

  5. I eventually use sliding windows on the categorized seconds to conditionally emit one of two messages to pubsub once per second.

My problem appears to be in step 3.

I'm trying to use the same windowing strategy at phase 3 that I'll eventually be using in phase 5 to run a sliding average calculation on the categorized seconds.

I've tried messing with the withTimestampCombiner(TimestampCombiner.EARLIEST) options but that does not seem to address it.

I've read about the .withEarlyFirings method used with event time but that seems like it would mimic my existing work around. I would ideally be able to rely on the watermark passing the end of the window & include late triggering.

// De-Batching The Pubsub Message

  static public class UnpackDataPoints extends DoFn<String,String>{
    @ProcessElement
        public  void processElement(@Element String c, OutputReceiver<String> out) {
            JsonArray packedData = new JsonParser().parse(c).getAsJsonArray();
            DateTimeFormatter dtf = DateTimeFormat.forPattern("EEE dd MMM YYYY HH:mm:ss:SSS zzz");
            for (JsonElement acDataPoint: packedData){
                String hereData = acDataPoint.toString();
                DateTime date = dtf.parseDateTime(acDataPoint.getAsJsonObject().get("Timestamp").getAsString());
                Instant eventTimeStamp = date.toInstant();
                out.outputWithTimestamp(hereData,eventTimeStamp);
            }
        }
        }
// Extracting The Second
 static public class ExtractTimeStamp extends DoFn<String,KV<String,String>> {
    @ProcessElement
        public void processElement(ProcessContext ctx ,@Element String c, OutputReceiver<KV<String,String>> out) {
            JsonObject accDataObject = new JsonParser().parse(c).getAsJsonObject();
            String milliString = accDataObject.get("Timestamp").getAsString();
            String secondString = StringUtils.left(milliString,24);
            accDataObject.addProperty("noMiliTimeStamp", secondString);
            String updatedAccData = accDataObject.toString();
            KV<String,String> outputKV = KV.of(secondString,updatedAccData);
                    out.output(outputKV);
    }
    }
// The Pipeline & Windowing
   Pipeline pipeline = Pipeline.create(options);

 PCollection<String> dataPoints = pipeline
    .apply("Read from Pubsub", PubsubIO.readStrings()
                    .fromTopic("projects/????/topics/???")
                    .withTimestampAttribute("messageTimestamp"))
   .apply("Extract Individual Data Points",ParDo.of(new UnpackDataPoints()));


 /// This is the event time window that doesn't fire for some reason
        /*
        PCollection<String> windowedDataPoints = dataPoints.apply(
                Window.<String>into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)))
               // .triggering(AfterWatermark.pastEndOfWindow())
               .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TWO_MINUTES))
                //.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)))
                .discardingFiredPanes()
                .withTimestampCombiner(TimestampCombiner.EARLIEST)
                .withAllowedLateness(Duration.standardSeconds(1)));
        */
     ///// Temporary Work Around, this does fire but data is out of order

        PCollection<String> windowedDataPoints = dataPoints.apply(
                Window.<String>into(FixedWindows.of(Duration.standardMinutes(120)))
                .triggering(
                        AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(Duration.standardSeconds(5)))
                .discardingFiredPanes()
                .withTimestampCombiner(TimestampCombiner.EARLIEST)
                        .withAllowedLateness(Duration.standardSeconds(1)));

  PCollection<KV<String, String>> TimeStamped = windowedDataPoints
                .apply( "Pulling Out The Second For Aggregates", ParDo.of(new ExtractTimeStamp()));

        PCollection<KV<String, Iterable<String>>> TimeStampedGrouped = TimeStamped.apply("Group By Key",GroupByKey.create());

        PCollection<KV<String, Iterable<String>>> testing = TimeStampedGrouped.apply("testingIsh", ParDo.of(new LogKVIterable()));

When I use the first windowing strategy which is commented out my pipeline runs indefinitely which receiving data & the LogKVIterable ParDo never returns anything, when I use the processing time work LogKVIterable does fire & log to the console.

2
Have you checked that your timestamps are being set properly? It may be that your watermark is not advancing properly if the timestamps are not getting parsed as intended...Pablo
Do you see your data freshness / system lag metrics keeping up with your pipeline?Pablo

2 Answers

1
votes

This really looks like the timestamp that you're adding to your data may be wrong / corrupt. I would encourage you to verify the following:

  1. The timestamp in your elements is correctly being added. Add some logging in the transforms before/after, and test that code extensively.

  2. The Data Freshness and System Lag metrics in your pipeline are progressing as you expect. If Data Freshness is not moving as expected, that is a strong indicator that your timestamp is not appropriately set.

1
votes

Triggering on processing time is different than triggering on event time. In processing time, there is no such thing as late data. In event time, handling late data is the real challenge. Late data in Event time processing is handled by using watermarks and triggers. For a great guide on this, I recommend checking out these two articles by Googler Tyler Akidau: a, b.

Since in Processing Time windowing, there is no such thing as late data, it makes sense that your Processing Time Apache Beam pipeline works without any problems.

Meanwhile in Event Time windowing, late data can occur, and your windowing and triggering should handle those scenarios, by good design.

Most likely your Event time processing pipeline code does not trigger due to bad configuration! It is not possible for me to reproduce your issue since your watermark (for a Pub/Sub source) is determined heuristically. Though I recommend that you debug your code by: First - increasing allowedLatness. For example: to 1 hour. If this works, great! If not, see Second. Second - comment-out withEarlyFirings. If this works, great! If not, uncomment and see Three Three - Use Fixed-Time Windows instead of Sliding-Time Windows.

Continue debugging until you are able to isolate the issue

:) :)