I'm struggling to get event time based triggers to fire for my apache beam pipeline but do seem to be able to trigger window firing with processing time.
My pipeline is fairly basic:
I receive batches of data points which include millisecond level timestamps from pubsub reading in with a timestamp slightly earlier than the earliest batched data point. Data is batched to reduce client side workload & pubsub expenses.
I extract second level time stamps & apply timestamps to the individual data points
I window the data for processing & avoid using global window.
I group the data by second for later categorizations by second of streaming data.
I eventually use sliding windows on the categorized seconds to conditionally emit one of two messages to pubsub once per second.
My problem appears to be in step 3.
I'm trying to use the same windowing strategy at phase 3 that I'll eventually be using in phase 5 to run a sliding average calculation on the categorized seconds.
I've tried messing with the withTimestampCombiner(TimestampCombiner.EARLIEST) options but that does not seem to address it.
I've read about the .withEarlyFirings method used with event time but that seems like it would mimic my existing work around. I would ideally be able to rely on the watermark passing the end of the window & include late triggering.
// De-Batching The Pubsub Message
static public class UnpackDataPoints extends DoFn<String,String>{
@ProcessElement
public void processElement(@Element String c, OutputReceiver<String> out) {
JsonArray packedData = new JsonParser().parse(c).getAsJsonArray();
DateTimeFormatter dtf = DateTimeFormat.forPattern("EEE dd MMM YYYY HH:mm:ss:SSS zzz");
for (JsonElement acDataPoint: packedData){
String hereData = acDataPoint.toString();
DateTime date = dtf.parseDateTime(acDataPoint.getAsJsonObject().get("Timestamp").getAsString());
Instant eventTimeStamp = date.toInstant();
out.outputWithTimestamp(hereData,eventTimeStamp);
}
}
}
// Extracting The Second
static public class ExtractTimeStamp extends DoFn<String,KV<String,String>> {
@ProcessElement
public void processElement(ProcessContext ctx ,@Element String c, OutputReceiver<KV<String,String>> out) {
JsonObject accDataObject = new JsonParser().parse(c).getAsJsonObject();
String milliString = accDataObject.get("Timestamp").getAsString();
String secondString = StringUtils.left(milliString,24);
accDataObject.addProperty("noMiliTimeStamp", secondString);
String updatedAccData = accDataObject.toString();
KV<String,String> outputKV = KV.of(secondString,updatedAccData);
out.output(outputKV);
}
}
// The Pipeline & Windowing
Pipeline pipeline = Pipeline.create(options);
PCollection<String> dataPoints = pipeline
.apply("Read from Pubsub", PubsubIO.readStrings()
.fromTopic("projects/????/topics/???")
.withTimestampAttribute("messageTimestamp"))
.apply("Extract Individual Data Points",ParDo.of(new UnpackDataPoints()));
/// This is the event time window that doesn't fire for some reason
/*
PCollection<String> windowedDataPoints = dataPoints.apply(
Window.<String>into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)))
// .triggering(AfterWatermark.pastEndOfWindow())
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TWO_MINUTES))
//.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)))
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.standardSeconds(1)));
*/
///// Temporary Work Around, this does fire but data is out of order
PCollection<String> windowedDataPoints = dataPoints.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(120)))
.triggering(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5)))
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.standardSeconds(1)));
PCollection<KV<String, String>> TimeStamped = windowedDataPoints
.apply( "Pulling Out The Second For Aggregates", ParDo.of(new ExtractTimeStamp()));
PCollection<KV<String, Iterable<String>>> TimeStampedGrouped = TimeStamped.apply("Group By Key",GroupByKey.create());
PCollection<KV<String, Iterable<String>>> testing = TimeStampedGrouped.apply("testingIsh", ParDo.of(new LogKVIterable()));
When I use the first windowing strategy which is commented out my pipeline runs indefinitely which receiving data & the LogKVIterable ParDo never returns anything, when I use the processing time work LogKVIterable does fire & log to the console.